diff --git a/src/somd2/app/run.py b/src/somd2/app/run.py index f92fd55..c2c7d61 100644 --- a/src/somd2/app/run.py +++ b/src/somd2/app/run.py @@ -39,7 +39,7 @@ def cli(): from somd2 import _logger from somd2.config import Config - from somd2.runner import Runner + from somd2.runner import Runner, RepexRunner from somd2.io import yaml_to_dict @@ -80,7 +80,10 @@ def cli(): config = Config(**args) # Instantiate a Runner object to run the simulation. - runner = Runner(system, config) + if config.replica_exchange: + runner = RepexRunner(system, config) + else: + runner = Runner(system, config) # Run the simulation. runner.run() diff --git a/src/somd2/config/_config.py b/src/somd2/config/_config.py index dc3d370..37385ec 100644 --- a/src/somd2/config/_config.py +++ b/src/somd2/config/_config.py @@ -119,7 +119,8 @@ def __init__( platform="auto", max_threads=None, max_gpus=None, - run_parallel=True, + oversubscription_factor=1, + replica_exchange=False, output_directory="output", restart=False, write_config=True, @@ -196,17 +197,16 @@ def __init__( Lennard-Jones interaction. restraints: sire.mm._MM.Restraints - A single set of restraints, or a list of - sets of restraints that will be applied to - the atoms during the simulation. + A single set of restraints, or a list of sets of restraints that + will be applied to the atoms during the simulation. constraint: str Constraint type to use for non-perturbable molecules. perturbable_constraint: str Constraint type to use for perturbable molecules. If None, then - this will be set according to what is chosen for the - non-perturbable constraint. + this will be set according to what is chosen for the non-perturbable + constraint. include_constrained_energies: bool Whether to include constrained energies in the potential. @@ -244,7 +244,8 @@ def __init__( Whether to use constraints during equilibration. energy_frequency: str - Frequency at which to output energy data. + Frequency at which to output energy data. If running using 'replica_exchange', + then this will also be the frequency at which replica swaps are attempted. save_trajectories: bool Whether to save trajectory files @@ -270,8 +271,12 @@ def __init__( Maximum number of GPUs to use for simulation (Default None, uses all available.) Does nothing if platform is set to CPU. - run_parallel: bool - Whether to run simulation in parallel. + oversubscription_factor: int + Factor by which to oversubscribe jobs on GPUs during replica exchange simulations. + + replica_exchange: bool + Whether to run replica exchange simulation. Currently this can only be used when + GPU resources are available. output_directory: str Path to a directory to store output files. @@ -349,7 +354,8 @@ def __init__( self.platform = platform self.max_threads = max_threads self.max_gpus = max_gpus - self.run_parallel = run_parallel + self.oversubscription_factor = oversubscription_factor + self.replica_exchange = replica_exchange self.restart = restart self.somd1_compatibility = somd1_compatibility self.pert_file = pert_file @@ -1203,9 +1209,12 @@ def max_gpus(self, max_gpus): ): if "CUDA_VISIBLE_DEVICES" in _os.environ: self._max_gpus = len(_os.environ["CUDA_VISIBLE_DEVICES"].split(",")) + elif "OPENCL_VISIBLE_DEVICES" in _os.environ: + self._max_gpus = len(_os.environ["OPENCL_VISIBLE_DEVICES"].split(",")) + elif "HIP_VISIBLE_DEVICES" in _os.environ: + self._max_gpus = len(_os.environ["HIP_VISIBLE_DEVICES"].split(",")) else: self._max_gpus = 0 - else: try: self._max_gpus = int(max_gpus) @@ -1217,14 +1226,31 @@ def max_gpus(self, max_gpus): ) @property - def run_parallel(self): - return self._run_parallel - - @run_parallel.setter - def run_parallel(self, run_parallel): - if not isinstance(run_parallel, bool): - raise ValueError("'run_parallel' must be of type 'bool'") - self._run_parallel = run_parallel + def oversubscription_factor(self): + return self._oversubscription_factor + + @oversubscription_factor.setter + def oversubscription_factor(self, oversubscription_factor): + if not isinstance(oversubscription_factor, int): + try: + oversubscription_factor = int(oversubscription_factor) + except: + raise ValueError("'oversubscription_factor' must be of type 'int'") + + if oversubscription_factor < 1: + raise ValueError("'oversubscription_factor' must be greater than 1") + + self._oversubscription_factor = oversubscription_factor + + @property + def replica_exchange(self): + return self._replica_exchange + + @replica_exchange.setter + def replica_exchange(self, replica_exchange): + if not isinstance(replica_exchange, bool): + raise ValueError("'replica_exchange' must be of type 'bool'") + self._replica_exchange = replica_exchange @property def restart(self): diff --git a/src/somd2/io/_io.py b/src/somd2/io/_io.py index 4930b1e..e0bcca6 100644 --- a/src/somd2/io/_io.py +++ b/src/somd2/io/_io.py @@ -50,11 +50,13 @@ def dataframe_to_parquet(df, metadata, filepath=None, filename=None): metadata: dict Dictionary containing metadata to be saved with the dataframe. - Currently just temperature and lambda value. filepath: str or pathlib.PosixPath The of the parent directory in to which the parquet file will be saved. If None, save to current working directory. + + filename: str + The name of the parquet file to be saved. If None, a default name will be used. """ if filepath is None: @@ -81,7 +83,7 @@ def dataframe_to_parquet(df, metadata, filepath=None, filename=None): return filepath / filename -def dict_to_yaml(data_dict, path, filename="config.yaml"): +def dict_to_yaml(data_dict, filename="config.yaml", path=None): """ Write a dictionary to a YAML file. @@ -91,16 +93,20 @@ def dict_to_yaml(data_dict, path, filename="config.yaml"): data_dict: dict The dictionary to be written to a YAML file. - path: str or pathlib.PosixPath - The path to the YAML file to be written. - filename: str The name of the YAML file to be written (default 'config.yaml'). + + path: str or pathlib.PosixPath + The path to the YAML file to be written. """ import yaml as _yaml - try: + if path is None: + path = _Path(filename) + else: path = _Path(path) / filename + + try: # Ensure the parent directory for the file exists path.parent.mkdir(parents=True, exist_ok=True) # Open the file in write mode and write the dictionary as YAML diff --git a/src/somd2/runner/__init__.py b/src/somd2/runner/__init__.py index e0b62af..2f439ff 100644 --- a/src/somd2/runner/__init__.py +++ b/src/somd2/runner/__init__.py @@ -20,3 +20,4 @@ ##################################################################### from ._runner import * +from ._repex import * diff --git a/src/somd2/runner/_base.py b/src/somd2/runner/_base.py new file mode 100644 index 0000000..0d1b6fc --- /dev/null +++ b/src/somd2/runner/_base.py @@ -0,0 +1,1160 @@ +###################################################################### +# SOMD2: GPU accelerated alchemical free-energy engine. +# +# Copyright: 2023-2024 +# +# Authors: The OpenBioSim Team +# +# SOMD2 is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# SOMD2 is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with SOMD2. If not, see . +##################################################################### + +__all__ = ["RunnerBase"] + +from pathlib import Path as _Path + +import sire as _sr +from sire.system import System as _System + +from somd2 import _logger + +from ..config import Config as _Config +from ..io import dataframe_to_parquet as _dataframe_to_parquet +from ..io import dict_to_yaml as _dict_to_yaml +from ..io import parquet_append as _parquet_append +from .._utils import _lam_sym + + +class RunnerBase: + """ + Base class for the SOMD2 simulation runner. + """ + + def __init__(self, system, config): + """ + Constructor. + + Parameters + ---------- + + system: str, :class: `System ` + The perturbable system to be simulated. This can be either a path + to a stream file, or a Sire system object. + + config: :class: `Config ` + The configuration options for the simulation. + """ + + if not isinstance(system, (str, _System)): + raise TypeError("'system' must be of type 'str' or 'sire.system.System'") + + if isinstance(system, str): + # Try to load the stream file. + try: + self._system = _sr.stream.load(system) + except: + raise IOError(f"Unable to load system from stream file: '{system}'") + else: + self._system = system + + # Validate the configuration. + if not isinstance(config, _Config): + raise TypeError("'config' must be of type 'somd2.config.Config'") + self._config = config + self._config._extra_args = {} + + # Log the versions of somd2 and sire. + from somd2 import __version__, _sire_version, _sire_revisionid + + _logger.info(f"somd2 version: {__version__}") + _logger.info(f"sire version: {_sire_version}+{_sire_revisionid}") + + # Check whether we need to apply a perturbation to the reference system. + if self._config.pert_file is not None: + _logger.info( + f"Applying perturbation to reference system: {self._config.pert_file}" + ) + try: + from .._utils._somd1 import _apply_pert + + self._system = _apply_pert(self._system, self._config.pert_file) + except Exception as e: + raise IOError(f"Unable to apply perturbation to reference system: {e}") + + # Make sure the system contains perturbable molecules. + try: + self._system.molecules("property is_perturbable") + except KeyError: + raise KeyError("No perturbable molecules in the system") + + # Link properties to the lambda = 0 end state. + self._system = _sr.morph.link_to_reference(self._system) + + # Set the default configuration options. + + # Restrict the atomic properties used to define light atoms when + # applying constraints. + self._config._extra_args["check_for_h_by_max_mass"] = True + self._config._extra_args["check_for_h_by_mass"] = False + self._config._extra_args["check_for_h_by_element"] = False + self._config._extra_args["check_for_h_by_ambertype"] = False + + # Make sure that perturbable LJ sigmas aren't scaled to zero. + self._config._extra_args["fix_perturbable_zero_sigmas"] = True + + # We're running in SOMD1 compatibility mode. + if self._config.somd1_compatibility: + from .._utils._somd1 import _make_compatible + + # First, try to make the perturbation SOMD1 compatible. + + _logger.info("Applying SOMD1 perturbation compatibility.") + self._system = _make_compatible(self._system) + self._system = _sr.morph.link_to_reference(self._system) + + # Next, swap the water topology so that it is in AMBER format. + + try: + waters = self._system["water"] + except: + waters = [] + + if len(waters) > 0: + from sire.legacy.IO import isAmberWater as _isAmberWater + from sire.legacy.IO import setAmberWater as _setAmberWater + + if not _isAmberWater(waters[0]): + num_atoms = waters[0].num_atoms() + + if num_atoms == 3: + # Here we assume TIP3p for any 3-point water model. + model = "tip3p" + elif num_atoms == 4: + model = "tip4p" + elif num_atoms == 5: + model = "tip5p" + + try: + self._system = _System( + _setAmberWater(self._system._system, model) + ) + _logger.info( + "Converting water topology to AMBER format for SOMD1 compatibility." + ) + except Exception as e: + _logger.error( + "Unable to convert water topology to AMBER format for SOMD1 compatibility." + ) + raise e + + # Ghost atoms are considered light when adding bond constraints. + self._config._extra_args["ghosts_are_light"] = True + + # Apply Boresch modifications to bonded terms involving ghost atoms to + # avoid spurious couplings to the physical system at the end states. + else: + from .._utils._ghosts import _boresch + + self._system = _boresch(self._system) + + # Check for a periodic space. + self._has_space = self._check_space() + + # Check the end state contraints. + self._check_end_state_constraints() + + # Get the charge difference between the two end states. + charge_diff = self._get_charge_difference(self._system) + + # Make sure the difference is integer valued to 5 decimal places. + if not round(charge_diff, 4).is_integer(): + _logger.warning("Charge difference between end states is not an integer.") + charge_diff = int(round(charge_diff, 4)) + + # Make sure the charge difference matches the expected value + # from the config. + if ( + self._config.charge_difference is not None + and self._config.charge_difference != charge_diff + ): + _logger.warning( + f"The charge difference of {charge_diff} between the end states " + f"does not match the specified value of {self._config.charge_difference}" + ) + # The user value takes precedence. + charge_diff = self._config.charge_difference + _logger.info( + f"Using user-specified value of {self._config.charge_difference}" + ) + else: + # Report that the charge will automatically be held constant. + if charge_diff != 0 and self._config.charge_difference is None: + _logger.info( + f"There is a charge difference of {charge_diff} between the end states. " + f"Adding alchemical ions to keep the charge constant." + ) + + # Create alchemical ions. + if charge_diff != 0: + self._system = self._create_alchemical_ions(self._system, charge_diff) + + # Set the lambda values. + if self._config.lambda_values: + self._lambda_values = self._config.lambda_values + else: + self._lambda_values = [ + round(i / (self._config.num_lambda - 1), 5) + for i in range(0, self._config.num_lambda) + ] + + # Set the lambda energy list. + if self._config.lambda_energy is not None: + self._lambda_energy = self._config.lambda_energy + else: + self._lambda_energy = self._lambda_values + + # Work out the current hydrogen mass factor. + h_mass_factor, has_hydrogen = self._get_h_mass_factor(self._system) + + # HMR has already been applied. + from math import isclose + + if has_hydrogen: + if not isclose(h_mass_factor, 1.0, abs_tol=1e-4): + _logger.info( + f"Detected existing hydrogen mass repartioning factor of {h_mass_factor:.3f}." + ) + + if not isclose(h_mass_factor, self._config.h_mass_factor, abs_tol=1e-4): + new_factor = self._config.h_mass_factor / h_mass_factor + _logger.warning( + f"Existing hydrogen mass repartitioning factor of {h_mass_factor:.3f} " + f"does not match the requested value of {self._config.h_mass_factor:.3f}. " + f"Applying new factor of {new_factor:.3f}." + ) + self._system = self._repartition_h_mass(self._system, new_factor) + + else: + self._system = self._repartition_h_mass( + self._system, self._config.h_mass_factor + ) + + # Flag whether this is a GPU simulation. + self._is_gpu = self._config.platform in ["cuda", "opencl", "hip"] + + # Need to verify before doing any directory checks + if self._config.restart: + self._verify_restart_config() + + # Check the output directories and create names of output files. + self._filenames = self._prepare_output() + + # Check for a valid restart. + if self._config.restart: + self._is_restart, self._system = self._check_restart() + else: + self._is_restart = False + + # Save config whenever 'configure' is called to keep it up to date + if self._config.write_config: + _dict_to_yaml( + self._config.as_dict(), + self._filenames[0]["config"], + ) + + # Save the end state topologies to the output directory. + if isinstance(self._system, list): + mols = self._system[0] + else: + mols = self._system + mols0 = _sr.morph.link_to_reference(mols) + mols1 = _sr.morph.link_to_perturbed(mols) + _sr.save(mols0, self._filenames["topology0"]) + _sr.save(mols1, self._filenames["topology1"]) + + # Initialise the number of frames. + self._num_frames = [0] * len(self._lambda_values) + + # Append only this number of lines from the end of the dataframe during checkpointing. + self._energy_per_block = int( + self._config.checkpoint_frequency / self._config.energy_frequency + ) + + # Create the default dynamics kwargs dictionary. These can be overloaded + # as needed. + self._dynamics_kwargs = { + "integrator": config.integrator, + "temperature": config.temperature, + "pressure": config.pressure if self._has_space else None, + "barostat_frequency": config.barostat_frequency, + "timestep": config.timestep, + "restraints": config.restraints, + "cutoff_type": config.cutoff_type, + "cutoff": config.cutoff, + "schedule": config.lambda_schedule, + "platform": config.platform, + "constraint": config.constraint, + "perturbable_constraint": config.perturbable_constraint, + "include_constrained_energies": config.include_constrained_energies, + "dynamic_constraints": config.dynamic_constraints, + "swap_end_states": config.swap_end_states, + "com_reset_frequency": config.com_reset_frequency, + "vacuum": not self._has_space, + "coulomb_power": config.coulomb_power, + "shift_coulomb": config.shift_coulomb, + "shift_delta": config.shift_delta, + "map": config._extra_args, + } + + def _check_space(self): + """ + Check if the system has a periodic space. + + Returns + ------- + + has_space: bool + Whether the system has a periodic space. + """ + if ( + self._system.has_property("space") + and self._system.property("space").is_periodic() + ): + return True + else: + _logger.info("No periodic space detected. Assuming vacuum simulation.") + if self._config.cutoff_type == "pme": + _logger.info( + "Cannot use PME for non-periodic simulations. Using RF cutoff instead." + ) + self._config.cutoff_type = "rf" + return False + + def _check_end_state_constraints(self): + """ + Internal function to check whether the constrants are the same at the two + end states. + """ + + # Find all perturbable molecules in the system.. + pert_mols = self._system.molecules("property is_perturbable") + + # Check constraints at lambda = 0 and lambda = 1 for each perturbable molecule. + for mol in pert_mols: + # Create a dynamics object. + d = mol.dynamics( + constraint=self._config.constraint, + perturbable_constraint=self._config.perturbable_constraint, + platform="cpu", + map=self._config._extra_args, + ) + + # Get the constraints at lambda = 0. + constraints0 = d.get_constraints() + + # Update to lambda = 1. + d.set_lambda(1) + + # Get the constraints at lambda = 1. + constraints1 = d.get_constraints() + + # Check for equivalence. + if len(constraints0) != len(constraints1): + _logger.info( + f"Constraints are at not the same at {_lam_sym} = 0 and {_lam_sym} = 1." + ) + else: + for c0, c1 in zip(constraints0, constraints1): + if c0 != c1: + _logger.info( + f"Constraints are at not the same at {_lam_sym} = 0 and {_lam_sym} = 1." + ) + break + + @staticmethod + def _get_charge_difference(system): + """ + Internal function to check the charge difference between the two end states. + + Parameters + ---------- + + system: :class: `System ` + The system to be perturbed. + + Returns + ------- + + charge_diff: int + The charge difference between the perturbed and reference states. + """ + + reference = _sr.morph.link_to_reference(system).charge().value() + perturbed = _sr.morph.link_to_perturbed(system).charge().value() + + return perturbed - reference + + @staticmethod + def _create_alchemical_ions(system, charge_diff): + """ + Internal function to create alchemical ions to maintain a constant charge. + + Parameters + ---------- + + system: :class: `System ` + The system to be perturbed. + + charge_diff: int + The charge difference between perturbed and reference states. + + Returns + ------- + + system: :class: `System ` + The perturbed system with alchemical ions added. + """ + + from sire.legacy.IO import createChlorineIon as _createChlorineIon + from sire.legacy.IO import createSodiumIon as _createSodiumIon + + # Clone the system. + system = system.clone() + + # The number of waters to convert is the absolute charge difference. + num_waters = abs(charge_diff) + + # Make sure there are enough waters to convert. The charge difference should + # never be this large, but it prevents a crash if it is. + if num_waters > len(system["water"].molecules()): + raise ValueError( + f"Insufficient waters to convert to ions. {num_waters} required, " + f"{len(system['water'].molecules())} available." + ) + + # Reference coordinates. + coords = system.molecules("property is_perturbable").coordinates() + coord_string = f"{coords[0].value()}, {coords[1].value()}, {coords[2].value()}" + + # Find the furthest N waters from the perturbable molecule. + waters = system[f"furthest {num_waters} waters from {coord_string}"].molecules() + + # Determine the water model. + if waters[0].num_atoms() == 3: + model = "tip3p" + elif waters[0].num_atoms() == 4: + model = "tip4p" + elif waters[0].num_atoms() == 5: + # Note that AMBER has no ion model for tip5p. + model = "tip4p" + + # Store the molecule numbers for the system. + numbers = system.numbers() + + # Create the ions. + for water in waters: + # Flag to indicate whether we need to reverse the alchemical ion + # perturbation, i.e. ion to water, rather than water to ion. + is_reverse = False + + # Create an ion to keep the charge constant throughout the + # perturbation. + if charge_diff > 0: + # Try to find a free chlorine ion so that we match parameters. + try: + has_ion = False + ions = system["element Cl"].molecules() + for ion in ions: + if ion.num_atoms() == 1: + has_ion = True + _logger.debug("Found Cl- ion in system.") + break + + # If there isn't an ion, then try searching for a free sodium ion. + if not has_ion: + ions = system["element Na"].molecules() + for ion in ions: + if ion.num_atoms() == 1: + has_ion = True + is_reverse = True + _logger.debug("Found Na+ ion in system.") + break + + # If not found, create one using a template. + if not has_ion: + _logger.debug(f"Creating Cl- ion from {model} water template.") + ion = _createChlorineIon( + water["element O"].coordinates(), model + ) + + # If not found, create one using a template. + except: + _logger.debug(f"Creating Cl- ion from {model} water template.") + ion = _createChlorineIon(water["element O"].coordinates(), model) + + # Create the ion string. + if is_reverse: + ion_str = "Na+" + else: + ion_str = "Cl-" + + else: + # Try to find a free sodium ion so that we match parameters. + try: + has_ion = False + ions = system["element Na"].molecules() + for ion in ions: + if ion.num_atoms() == 1: + has_ion = True + _logger.debug("Found Na+ ion in system.") + break + + # If there isn't an ion, then try searching for a free chlorine ion. + if not has_ion: + ions = system["element Cl"].molecules() + for ion in ions: + if ion.num_atoms() == 1: + has_ion = True + is_reverse = True + _logger.debug("Found Cl- ion in system.") + break + + # If not found, create one using a template. + if not has_ion: + _logger.debug(f"Creating Na+ ion from {model} water template.") + ion = _createSodiumIon(water["element O"].coordinates(), model) + + # If not found, create one using a template. + except: + _logger.debug(f"Creating Na+ ion from {model} water template.") + ion = _createSodiumIon(water["element O"].coordinates(), model) + + # Create the ion string. + if is_reverse: + ion_str = "Cl-" + else: + ion_str = "Na+" + + # Create an alchemical ion: ion --> water. + if is_reverse: + merged = _sr.morph.merge(ion, water, map={"as_new_molecule": False}) + # Create an alchemical ion: water --> ion. + else: + merged = _sr.morph.merge(water, ion, map={"as_new_molecule": False}) + + # Update the system. + system.update(merged) + + # Get the index of the perturbed water. + index = numbers.index(water.number()) + + # Log that we are adding an alchemical ion. + if is_reverse: + _logger.info( + f"Water at molecule index {index} will be perturbed from a " + f"{ion_str} ion to keep charge constant." + ) + else: + _logger.info( + f"Water at molecule index {index} will be perturbed to a " + f"{ion_str} ion to keep charge constant." + ) + + return system + + @staticmethod + def _create_filenames(lambda_array, lambda_value, output_directory, restart=False): + # Create incremental file name for current restart. + def increment_filename(base_filename, suffix): + file_number = 0 + file_path = _Path(output_directory) + while True: + filename = ( + f"{base_filename}_{file_number}.{suffix}" + if file_number > 0 + else f"{base_filename}.{suffix}" + ) + full_path = file_path / filename + if not full_path.exists(): + return filename + file_number += 1 + + if lambda_value not in lambda_array: + raise ValueError("lambda_value not in lambda_array") + lam = f"{lambda_value:.5f}" + filenames = {} + filenames["checkpoint"] = str(output_directory / f"checkpoint_{lam}.s3") + filenames["energy_traj"] = str(output_directory / f"energy_traj_{lam}.parquet") + filenames["trajectory"] = str(output_directory / f"traj_{lam}.dcd") + filenames["trajectory_chunk"] = str(output_directory / f"traj_{lam}_") + filenames["energy_components"] = str( + output_directory / f"energy_components_{lam}.txt" + ) + if restart: + filenames["config"] = str( + output_directory / increment_filename("config", "yaml") + ) + else: + filenames["config"] = str(output_directory / "config.yaml") + return filenames + + def _prepare_output(self): + """ + Prepare the output directory and create for simulation, creating the + file names for each lambda value. The directory is checked for existing + output and files are deleted if necessary. Incremental file names are + created for restarts. + + Returns + ------- + + filenames: dict + Dictionary of file names for each lambda value. + """ + from pathlib import Path as _Path + + filenames = {} + deleted = [] + for i, lambda_value in enumerate(self._lambda_values): + files = self._create_filenames( + self._lambda_values, + lambda_value, + self._config.output_directory, + self._config.restart, + ) + filenames[i] = files + if not self._config.restart: + for file in files.values(): + if _Path.exists(_Path(file)): + deleted.append(_Path(file)) + if len(deleted) > 0: + if not self._config.overwrite: + deleted_str = [str(file) for file in deleted] + _logger.error( + f"The following files already exist, use --overwrite to overwrite them: {list(set((deleted_str)))} \n" + ) + exit(1) + # Loop over files to be deleted, ignoring duplicates. + for file in list(set(deleted)): + file.unlink() + + filenames["topology0"] = str(self._config.output_directory / "system0.prm7") + filenames["topology1"] = str(self._config.output_directory / "system1.prm7") + + return filenames + + def _check_restart(self): + """ + Check the output directory for a valid restart state. + + Returns + ------- + + is_restart: bool + Whether the simulation is a restart. + + system: :class: `System `, List[:class: `System `] + The system or list of systems to be restarted. + """ + + # List to store systems for each lambda value. + systems = [] + + for i, lambda_value in enumerate(self._lambda_values): + # Try to load the checkpoint file. + try: + system = _sr.stream.load(self._filenames[i]["checkpoint"]) + except: + _logger.warning( + f"Unable to load checkpoint file for {_lam_sym}={lambda_value:.5f}, starting from scratch." + ) + return False, self._system + else: + # Check the system is the same as the reference system. + are_same, reason = self._systems_are_same(self._system, system) + if not are_same: + raise ValueError( + f"Checkpoint file does not match system for the following reason: {reason}." + ) + # Make sure the configuration is consistent. + try: + self._compare_configs( + self._last_config, dict(system.property("config")) + ) + except: + config = dict(system.property("config")) + _logger.debug( + f"last config: {self._last_config}, current config: {config}" + ) + _logger.error( + f"Config for {_lam_sym}={lambda_value} does not match previous config." + ) + raise + # Make sure the lambda value is consistent. + else: + lambda_restart = system.property("lambda") + try: + lambda_restart == lambda_value + except: + filename = self._filenames[i]["checkpoint"] + raise ValueError( + f"Lambda value from checkpoint file {filename} for {lambda_restart} " + f"does not match expected value {lambda_value}." + ) + + # Append the system to the list. + systems.append(_sr.morph.link_to_reference(system)) + + return True, systems + + @staticmethod + def _compare_configs(config1, config2): + """ + Internal function to check compatibility between two configuration files. + """ + + if not isinstance(config1, dict): + raise TypeError("'config1' must be of type 'dict'") + if not isinstance(config2, dict): + raise TypeError("'config2' must be of type 'dict'") + + from sire.units import GeneralUnit as _GeneralUnit + + # Define the subset of settings that are allowed to change after restart. + allowed_diffs = [ + "runtime", + "restart", + "minimise", + "max_threads", + "equilibration_time", + "equilibration_timestep", + "equilibration_constraints", + "energy_frequency", + "save_trajectory", + "frame_frequency", + "save_velocities", + "checkpoint_frequency", + "platform", + "max_threads", + "max_gpus", + "restart", + "save_trajectories", + "write_config", + "log_level", + "log_file", + "overwrite", + ] + for key in config1.keys(): + if key not in allowed_diffs: + # Extract the config values. + v1 = config1[key] + v2 = config2[key] + + # Convert GeneralUnits to strings for comparison. + if isinstance(v1, _GeneralUnit): + v1 = str(v1) + if isinstance(v2, _GeneralUnit): + v2 = str(v2) + + # If one is from sire and the other is not, will raise error even though they are the same. + if (v1 == None and v2 == False) or (v2 == None and v1 == False): + continue + elif v1 != v2: + raise ValueError( + f"{key} has changed since the last run. This is not allowed when using the restart option." + ) + + def _verify_restart_config(self): + """ + Verify that the config file matches the config file used to create the + checkpoint file. + """ + import yaml as _yaml + + def get_last_config(output_directory): + """ + Returns the last config file in the output directory. + """ + import os as _os + + config_files = [ + file + for file in _os.listdir(output_directory) + if file.endswith(".yaml") and file.startswith("config") + ] + config_files.sort() + return config_files[-1] + + try: + last_config = get_last_config(self._config.output_directory) + with open(self._config.output_directory / last_config) as file: + _logger.debug(f"Opening config file {last_config}") + self._last_config = _yaml.safe_load(file) + config = self._config.as_dict() + except: + _logger.info( + f"""No config files found in {self._config.output_directory}, + attempting to retrieve config from lambda = 0 checkpoint file.""" + ) + try: + system_temp = _sr.stream.load( + str(self._config.output_directory / "checkpoint_0.00000.s3") + ) + except: + expdir = self._config.output_directory / "checkpoint_0.00000.s3" + _logger.error(f"Unable to load checkpoint file from {expdir}.") + raise + else: + self._last_config = dict(system_temp.property("config")) + config = self._config.as_dict(sire_compatible=True) + del system_temp + + self._compare_configs(self._last_config, config) + + @staticmethod + def _systems_are_same(system0, system1): + """ + Check for equivalence between a pair of sire systems. + + Parameters + ---------- + + system0: sire.system.System + The first system to be compared. + + system1: sire.system.System + The second system to be compared. + + Returns + ------- + + are_same: bool + Whether the systems are the same. + """ + if not isinstance(system0, _System): + raise TypeError("'system0' must be of type 'sire.system.System'") + if not isinstance(system1, _System): + raise TypeError("'system1' must be of type 'sire.system.System'") + + # Check for matching number of molecules. + if not len(system0.molecules()) == len(system1.molecules()): + reason = "number of molecules do not match" + return False, reason + + # Check for matching number of residues. + if not len(system0.residues()) == len(system1.residues()): + reason = "number of residues do not match" + return False, reason + + # Check for matching number of atoms. + if not len(system0.atoms()) == len(system1.atoms()): + reason = "number of atoms do not match" + return False, reason + + return True, None + + @staticmethod + def _get_gpu_devices(platform): + """ + Get list of available GPUs from CUDA_VISIBLE_DEVICES, + OPENCL_VISIBLE_DEVICES, or HIP_VISIBLE_DEVICES. + + Parameters + ---------- + + platform: str + The GPU platform to be used for simulations. + + Returns + -------- + + available_devices : [int] + List of available device numbers. + """ + + if not isinstance(platform, str): + raise TypeError("'platform' must be of type 'str'") + + platform = platform.lower().replace(" ", "") + + if platform not in ["cuda", "opencl", "hip"]: + raise ValueError("'platform' must be one of 'cuda', 'opencl', or 'hip'.") + + import os as _os + + if platform == "cuda": + if _os.environ.get("CUDA_VISIBLE_DEVICES") is None: + raise ValueError("CUDA_VISIBLE_DEVICES not set") + else: + available_devices = _os.environ.get("CUDA_VISIBLE_DEVICES").split(",") + _logger.info(f"CUDA_VISIBLE_DEVICES set to {available_devices}") + elif platform == "opencl": + if _os.environ.get("OPENCL_VISIBLE_DEVICES") is None: + raise ValueError("OPENCL_VISIBLE_DEVICES not set") + else: + available_devices = _os.environ.get("OPENCL_VISIBLE_DEVICES").split(",") + _logger.info(f"OPENCL_VISIBLE_DEVICES set to {available_devices}") + elif platform == "hip": + if _os.environ.get("HIP_VISIBLE_DEVICES") is None: + raise ValueError("HIP_VISIBLE_DEVICES not set") + else: + available_devices = _os.environ.get("HIP_VISIBLE_DEVICES").split(",") + _logger.info(f"HIP_VISIBLE_DEVICES set to {available_devices}") + + num_gpus = len(available_devices) + _logger.info(f"Number of GPUs available: {num_gpus}") + + return available_devices + + @staticmethod + def _get_h_mass_factor(system): + """ + Get the current hydrogen mass factor. + + Parameters + ---------- + + system : :class: `System ` + The system of interest. + """ + + from sire.mol import Element + + # Store the expected hydrogen mass. + expected_h_mass = Element("H").mass().value() + + # Get the mass of the first hydrogen atom. + try: + h_mass = system["element H"][0].mass() + except: + return expected_h_mass, False + + # Work out the current hydrogen mass factor. We round to 3dp due to + # the precision of atomic masses loaded from text files. + return round(h_mass.value() / expected_h_mass, 3), True + + @staticmethod + def _repartition_h_mass(system, factor=1.0): + """ + Repartition hydrogen masses. + + Parameters + ---------- + + system : :class: `System ` + The system to be repartitioned. + + factor :float + The factor by which hydrogen masses will be scaled. + + Returns + ------- + + system : :class: `System ` + The repartitioned system. + """ + + if not isinstance(factor, float): + raise TypeError("'factor' must be of type 'float'") + + from math import isclose + + # Early exit if no repartitioning is required. + if isclose(factor, 1.0, abs_tol=1e-4): + return system + + from sire.morph import ( + repartition_hydrogen_masses as _repartition_hydrogen_masses, + ) + + _logger.info(f"Repartitioning hydrogen masses with factor {factor:.3f}") + + return _repartition_hydrogen_masses( + system, + mass_factor=factor, + ) + + def _checkpoint( + self, + system, + index, + block, + speed, + lambda_energy=None, + lambda_grad=None, + is_final_block=False, + ): + """ + Save a checkpoint file. + + Parameters + ---------- + + system : :class: `System ` + The system to be saved. + + index : int + The index of the window or replica. + + block : int + The current block number. + + speed: float + The speed of the simulation is ns/day. + + lambda_energy : List[float] + The sampled lambda energy values. + + lambda_grad : List[float] + Lambda values for finite-difference gradients. + + is_final_block: bool + Whether this is the final block of the simulation. + + speed: + """ + + from somd2 import __version__, _sire_version, _sire_revisionid + + # Get the lambda value. + lam = self._lambda_values[index] + + # Get the energy trajectory. + df = system.energy_trajectory(to_alchemlyb=True, energy_unit="kT") + + if lambda_energy is None: + lamdba_energy = self._lambda_values + + # Create the metadata. + metadata = { + "attrs": df.attrs, + "somd2 version": __version__, + "sire version": f"{_sire_version}+{_sire_revisionid}", + "lambda": str(lam), + "lambda_array": lambda_energy, + "speed": speed, + "temperature": str(self._config.temperature.value()), + } + + # Add the lambda gradient if available. + if lambda_grad is not None: + metadata["lambda_grad"] = lambda_grad + + if is_final_block: + # Assemble and save the final trajectory. + if self._config.save_trajectories: + # Save the final trajectory chunk to file. + if system.num_frames() > self._num_frames[index]: + traj_filename = ( + self._filenames[index]["trajectory_chunk"] + f"{block}.dcd" + ) + _sr.save( + system.trajectory()[self._num_frames[index] :], + traj_filename, + format=["DCD"], + ) + self._num_frames[index] = system.num_frames() + + # Create the final topology file name. + topology0 = self._filenames["topology0"] + + # Create the final trajectory file name. + traj_filename = self._filenames[index]["trajectory"] + + # Glob for the trajectory chunks. + from glob import glob + + traj_chunks = sorted( + glob(f"{self._filenames[index]['trajectory_chunk']}*") + ) + + # If this is a restart, then we need to check for an existing + # trajectory file with the same name. If it exists and is non-empty, + # then copy it to a backup file and prepend it to the list of chunks. + if self._config.restart: + path = _Path(traj_filename) + if path.exists() and path.stat().st_size > 0: + from shutil import copyfile + + copyfile(traj_filename, f"{traj_filename}.bak") + traj_chunks = [f"{traj_filename}.bak"] + traj_chunks + + # Load the topology and chunked trajectory files. + mols = _sr.load([topology0] + traj_chunks) + + # Save the final trajectory to a single file. + _sr.save(mols.trajectory(), traj_filename, format=["DCD"]) + self._num_frames[index] = system.num_frames() + + # Now remove the chunked trajectory files. + for chunk in traj_chunks: + _Path(chunk).unlink() + + # Add config and lambda value to the system properties. + system.set_property("config", self._config.as_dict(sire_compatible=True)) + system.set_property("lambda", lam) + + # Delete the trajectory frames. + system.delete_all_frames() + + # Stream the final system to file. + _sr.stream.save(system, self._filenames[index]["checkpoint"]) + + # Create the final parquet file. + self._parquet = _dataframe_to_parquet( + df, + metadata=metadata, + filename=self._filenames[index]["energy_traj"], + ) + + else: + # Update the starting block if necessary. + if block == 0: + block = self._start_block + + # Save the current trajectory chunk to file. + if self._config.save_trajectories: + if system.num_frames() > self._num_frames[index]: + traj_filename = ( + self._filenames[index]["trajectory_chunk"] + f"{block}.dcd" + ) + _sr.save( + system.trajectory()[self._num_frames[index] :], + traj_filename, + format=["DCD"], + ) + self._num_frames[index] = system.num_frames() + + # Encode the configuration and lambda value as system properties. + system.set_property("config", self._config.as_dict(sire_compatible=True)) + system.set_property("lambda", lam) + + # Delete the trajectory frames. + system.delete_all_frames() + + # Stream the checkpoint to file. + _sr.stream.save(system, self._filenames[index]["checkpoint"]) + + # Create the parquet file. + if block == self._start_block: + self._parquet = _dataframe_to_parquet( + df, + metadata=metadata, + filename=self._filenames[index]["energy_traj"], + ) + # Append to the parquet file. + else: + _parquet_append( + self._parquet, + df.iloc[-self._energy_per_block :], + ) diff --git a/src/somd2/runner/_dynamics.py b/src/somd2/runner/_dynamics.py deleted file mode 100644 index 525d274..0000000 --- a/src/somd2/runner/_dynamics.py +++ /dev/null @@ -1,653 +0,0 @@ -###################################################################### -# SOMD2: GPU accelerated alchemical free-energy engine. -# -# Copyright: 2023-2024 -# -# Authors: The OpenBioSim Team -# -# SOMD2 is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# SOMD2 is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with SOMD2. If not, see . -##################################################################### - -__all__ = ["Dynamics"] - -from pathlib import Path as _Path - -from somd2 import _logger - -from ..config import Config as _Config -from ..io import dataframe_to_parquet as _dataframe_to_parquet -from ..io import parquet_append as _parquet_append -from .._utils import _lam_sym - - -class Dynamics: - """ - Class for controlling the running and bookkeeping of a single lambda value - simulation.Currently just a wrapper around Sire dynamics. - """ - - def __init__( - self, - system, - lambda_val, - lambda_array, - lambda_energy, - config, - increment=0.001, - device=None, - has_space=True, - ): - """ - Constructor - - Parameters - ---------- - - system : Sire System - Sire system containing at least one perturbable molecule - - lambda_val : float - Lambda value for the simulation - - lambda_array : list - List of lambda values to be used for simulation. - - lambda_energy: list - List of lambda values to be used for sampling energies. If None, then we - won't return reduced perturbed energies. - - increment : float - Increment of lambda value - used for calculating the gradient - - config : somd2 Config object - Config object containing simulation options - - device : int - GPU device number to use - does nothing if running on CPU (default None) - - has_space : bool - Whether this simulation has a periodic space or not. Disable NPT if - no space is present. - - """ - - try: - system.molecules("property is_perturbable") - except KeyError: - raise KeyError("No perturbable molecules in the system") - - self._system = system - - if not isinstance(config, _Config): - raise TypeError("config must be a Config object") - - self._config = config - # If restarting, subtract the time already run from the total runtime - if self._config.restart: - self._config.runtime = str(self._config.runtime - self._system.time()) - - # Work out the current block number. - self._current_block = int( - round( - self._system.time().value() - / self._config.checkpoint_frequency.value(), - 12, - ) - ) - else: - self._current_block = 0 - - lambda_energy = lambda_energy.copy() - if not lambda_val in lambda_energy: - lambda_energy.append(lambda_val) - lambda_energy = sorted(lambda_energy) - - self._lambda_val = lambda_val - self._lambda_array = lambda_array - self._lambda_energy = lambda_energy - self._increment = increment - self._device = device - self._has_space = has_space - self._filenames = self.create_filenames( - self._lambda_array, - self._lambda_val, - self._config.output_directory, - self._config.restart, - ) - - self._num_frames = 0 - self._nrg_sample = 0 - self._nrg_file = "energy_components.txt" - - @staticmethod - def create_filenames(lambda_array, lambda_value, output_directory, restart=False): - # Create incremental file name for current restart. - def increment_filename(base_filename, suffix): - file_number = 0 - file_path = _Path(output_directory) - while True: - filename = ( - f"{base_filename}_{file_number}.{suffix}" - if file_number > 0 - else f"{base_filename}.{suffix}" - ) - full_path = file_path / filename - if not full_path.exists(): - return filename - file_number += 1 - - if lambda_value not in lambda_array: - raise ValueError("lambda_value not in lambda_array") - lam = f"{lambda_value:.5f}" - filenames = {} - filenames["topology0"] = "system0.prm7" - filenames["topology1"] = "system1.prm7" - filenames["checkpoint"] = f"checkpoint_{lam}.s3" - filenames["energy_traj"] = f"energy_traj_{lam}.parquet" - filenames["trajectory"] = f"traj_{lam}.dcd" - filenames["trajectory_chunk"] = f"traj_{lam}_" - filenames["energy_components"] = f"energy_components_{lam}.txt" - if restart: - filenames["config"] = increment_filename("config", "yaml") - else: - filenames["config"] = "config.yaml" - return filenames - - def _setup_dynamics(self, equilibration=False): - """ - Setup the dynamics object. - - Parameters - ---------- - - lam_val_min : float - Lambda value at which to run minimisation, - if None run at pre-set lambda_val - - equilibration : bool - If True, use equilibration settings, otherwise use production settings - """ - - # Don't use NPT for vacuum simulations. - if self._has_space: - pressure = self._config.pressure - else: - pressure = None - - # Create the dynamics object. - self._dyn = self._system.dynamics( - integrator=self._config.integrator, - temperature=self._config.temperature, - pressure=pressure, - barostat_frequency=self._config.barostat_frequency, - timestep=( - self._config.equilibration_timestep - if equilibration - else self._config.timestep - ), - restraints=self._config.restraints, - lambda_value=self._lambda_val, - cutoff_type=self._config.cutoff_type, - cutoff=self._config.cutoff, - schedule=self._config.lambda_schedule, - platform=self._config.platform, - device=self._device, - constraint=( - "none" - if equilibration and not self._config.equilibration_constraints - else self._config.constraint - ), - perturbable_constraint=( - "none" - if equilibration and not self._config.equilibration_constraints - else self._config.perturbable_constraint - ), - include_constrained_energies=self._config.include_constrained_energies, - dynamic_constraints=self._config.dynamic_constraints, - swap_end_states=self._config.swap_end_states, - com_reset_frequency=self._config.com_reset_frequency, - vacuum=not self._has_space, - coulomb_power=self._config.coulomb_power, - shift_coulomb=self._config.shift_coulomb, - shift_delta=self._config.shift_delta, - map=self._config._extra_args, - ) - - def _minimisation( - self, lambda_min=None, constraint="none", perturbable_constraint="none" - ): - """ - Minimisation of self._system. - - Parameters - ---------- - - lambda_min : float - Lambda value at which to run minimisation, if None run at pre-set - lambda_val. - """ - - if lambda_min is None: - _logger.info(f"Minimising at {_lam_sym} = {self._lambda_val}") - try: - # Create a minimisation object. - m = self._system.minimisation( - cutoff_type=self._config.cutoff_type, - cutoff=self._config.cutoff, - schedule=self._config.lambda_schedule, - lambda_value=self._lambda_val, - platform=self._config.platform, - vacuum=not self._has_space, - constraint=constraint, - perturbable_constraint=perturbable_constraint, - include_constrained_energies=self._config.include_constrained_energies, - dynamic_constraints=self._config.dynamic_constraints, - swap_end_states=self._config.swap_end_states, - coulomb_power=self._config.coulomb_power, - shift_coulomb=self._config.shift_coulomb, - shift_delta=self._config.shift_delta, - map=self._config._extra_args, - ) - m.run(timeout=self._config.timeout) - self._system = m.commit() - except: - raise - else: - _logger.info(f"Minimising at {_lam_sym} = {lambda_min}") - try: - # Create a minimisation object. - m = self._system.minimisation( - cutoff_type=self._config.cutoff_type, - cutoff=self._config.cutoff, - schedule=self._config.lambda_schedule, - lambda_value=lambda_min, - platform=self._config.platform, - vacuum=not self._has_space, - constraint=constraint, - perturbable_constraint=perturbable_constraint, - include_constrained_energies=self._config.include_constrained_energies, - dynamic_constraints=self._config.dynamic_constraints, - swap_end_states=self._config.swap_end_states, - coulomb_power=self._config.coulomb_power, - shift_coulomb=self._config.shift_coulomb, - shift_delta=self._config.shift_delta, - map=self._config._extra_args, - ) - m.run(timeout=self._config.timeout) - self._system = m.commit() - except: - raise - - def _equilibration(self): - """ - Per-window equilibration. - Currently just runs dynamics without any saving - """ - - _logger.info(f"Equilibrating at {_lam_sym} = {self._lambda_val}") - self._setup_dynamics(equilibration=True) - self._dyn.run( - self._config.equilibration_time, - frame_frequency=0, - energy_frequency=0, - save_velocities=False, - auto_fix_minimise=False, - ) - self._system = self._dyn.commit() - - def _run(self, lambda_minimisation=None, is_restart=False): - """ - Run the simulation with bookkeeping. - - Returns - ------- - - df : pandas dataframe - Dataframe containing the sire energy trajectory. - """ - import sire as sr - - # Save the system topology to a PRM7 file that can be used to load the - # trajectory. - topology0 = str(self._config.output_directory / self._filenames["topology0"]) - topology1 = str(self._config.output_directory / self._filenames["topology1"]) - mols0 = sr.morph.link_to_reference(self._system) - mols1 = sr.morph.link_to_perturbed(self._system) - sr.save(mols0, topology0) - sr.save(mols1, topology1) - - def generate_lam_vals(lambda_base, increment): - """Generate lambda values for a given lambda_base and increment""" - if lambda_base + increment > 1.0 and lambda_base - increment < 0.0: - raise ValueError("Increment too large") - if lambda_base + increment > 1.0: - lam_vals = [lambda_base - increment] - elif lambda_base - increment < 0.0: - lam_vals = [lambda_base + increment] - else: - lam_vals = [lambda_base - increment, lambda_base + increment] - return lam_vals - - if self._config.minimise: - # Minimise with no constraints if we need to equilibrate first. - # This seems to improve the stability of the equilibration. - if self._config.equilibration_time.value() > 0.0 and not is_restart: - constraint = "none" - perturbable_constraint = "none" - else: - constraint = self._config.constraint - perturbable_constraint = self._config.perturbable_constraint - - self._minimisation( - lambda_minimisation, - constraint=constraint, - perturbable_constraint=perturbable_constraint, - ) - - if self._config.equilibration_time.value() > 0.0 and not is_restart: - self._equilibration() - - # Reset the timer to zero - self._system.set_time(sr.u("0ps")) - - # Perform minimisation at the end of equilibration only if the - # timestep is increasing, or the constraint is changing. - if (self._config.timestep > self._config.equilibration_timestep) or ( - not self._config.equilibration_constraints - and self._config.perturbable_constraint != "none" - ): - self._minimisation( - lambda_min=self._lambda_val, - constraint=self._config.constraint, - perturbable_constraint=self._config.perturbable_constraint, - ) - - # Setup the dynamics object for production. - self._setup_dynamics(equilibration=False) - - # Work out the lambda values for finite-difference gradient analysis. - self._lambda_grad = generate_lam_vals(self._lambda_val, self._increment) - - if self._lambda_energy is None: - lam_arr = self._lambda_grad - else: - lam_arr = self._lambda_energy + self._lambda_grad - - _logger.info(f"Running dynamics at {_lam_sym} = {self._lambda_val}") - - # Create the checkpoint file name. - checkpoint_file = str( - _Path(self._config.output_directory) / self._filenames["checkpoint"] - ) - - if self._config.checkpoint_frequency.value() > 0.0: - # Calculate the number of blocks and the remainder time. - frac = ( - self._config.runtime.value() / self._config.checkpoint_frequency.value() - ) - - # Handle the case where the runtime is less than the checkpoint frequency. - if frac < 1.0: - frac = 1.0 - self._config.checkpoint_frequency = str(self._config.runtime) - - num_blocks = int(frac) - rem = frac - num_blocks - - # Append only this number of lines from the end of the dataframe during checkpointing - energy_per_block = ( - self._config.checkpoint_frequency / self._config.energy_frequency - ) - - # Run num_blocks dynamics and then run a final block if rem > 0 - for x in range(int(num_blocks)): - # Add the current block number. - x += self._current_block - - # Run the dynamics. - try: - self._dyn.run( - self._config.checkpoint_frequency, - energy_frequency=self._config.energy_frequency, - frame_frequency=self._config.frame_frequency, - lambda_windows=lam_arr, - save_velocities=self._config.save_velocities, - auto_fix_minimise=False, - ) - except: - raise - - # Checkpoint. - try: - # Save the energy contribution for each force. - if self._config.save_energy_components: - self._save_energy_components() - - # Set to the current block number if this is a restart. - if x == 0: - x = self._current_block - - # Commit the current system. - self._system = self._dyn.commit() - - # Save the current trajectory chunk to file. - if self._config.save_trajectories: - traj_filename = ( - str( - self._config.output_directory - / self._filenames["trajectory_chunk"] - ) - + f"{x}.dcd" - ) - sr.save( - self._system.trajectory()[self._num_frames :], - traj_filename, - format=["DCD"], - ) - self._num_frames = self._system.num_frames() - - # Stream the checkpoint to file. - sr.stream.save(self._system, str(checkpoint_file)) - - # Save the energy trajectory to a parquet file. - df = self._system.energy_trajectory( - to_alchemlyb=True, energy_unit="kT" - ) - if x == self._current_block: - # Not including speed in checkpoints for now. - parquet = _dataframe_to_parquet( - df, - metadata={ - "attrs": df.attrs, - "lambda": str(self._lambda_val), - "lambda_array": self._lambda_energy, - "lambda_grad": self._lambda_grad, - "temperature": str(self._config.temperature.value()), - }, - filepath=self._config.output_directory, - filename=self._filenames["energy_traj"], - ) - # Also want to add the simulation config to the - # system properties once a block has been successfully run. - self._system.set_property( - "config", self._config.as_dict(sire_compatible=True) - ) - # Finally, encode lambda value in to properties. - self._system.set_property("lambda", self._lambda_val) - else: - _parquet_append( - parquet, - df.iloc[-int(energy_per_block) :], - ) - _logger.info( - f"Finished block {x+1} of {self._current_block + num_blocks + int(rem > 0)} " - f"for {_lam_sym} = {self._lambda_val}" - ) - except: - raise - # No need to checkpoint here as it is the final block. - if rem > 0: - x += 1 - try: - self._dyn.run( - rem, - energy_frequency=self._config.energy_frequency, - frame_frequency=self._config.frame_frequency, - lambda_windows=lam_arr, - save_velocities=self._config.save_velocities, - auto_fix_minimise=False, - ) - - # Save the current trajectory chunk to file. - if self._config.save_trajectories: - traj_filename = ( - str( - self._config.output_directory - / self._filenames["trajectory_chunk"] - ) - + f"{x}.dcd" - ) - sr.save( - self._system.trajectory()[self._num_frames :], - traj_filename, - format=["DCD"], - ) - self._num_frames = self._system.num_frames() - - _logger.info( - f"Finished block {x+1} of {self._current_block + num_blocks + int(rem > 0)} " - f"for {_lam_sym} = {self._lambda_val}" - ) - except: - raise - self._system = self._dyn.commit() - else: - try: - self._dyn.run( - self._config.checkpoint_frequency, - energy_frequency=self._config.energy_frequency, - frame_frequency=self._config.frame_frequency, - lambda_windows=lam_arr, - save_velocities=self._config.save_velocities, - auto_fix_minimise=False, - ) - except: - raise - self._system = self._dyn.commit() - - # Assemble and save the final energy trajectory. - if self._config.save_trajectories: - # Create the final trajectory file name. - traj_filename = str( - self._config.output_directory / self._filenames["trajectory"] - ) - - # Glob for the trajectory chunks. - from glob import glob - - traj_chunks = sorted( - glob( - str( - self._config.output_directory - / f"{self._filenames['trajectory_chunk']}*" - ) - ) - ) - - # If this is a restart, then we need to check for an existing - # trajectory file with the same name. If it exists and is non-empty, - # then copy it to a backup file and prepend it to the list of chunks. - if self._config.restart: - path = _Path(traj_filename) - if path.exists() and path.stat().st_size > 0: - from shutil import copyfile - - copyfile(traj_filename, f"{traj_filename}.bak") - traj_chunks = [f"{traj_filename}.bak"] + traj_chunks - - # Load the topology and chunked trajectory files. - system = sr.load([topology0] + traj_chunks) - - # Save the final trajectory to a single file. - traj_filename = str( - self._config.output_directory / self._filenames["trajectory"] - ) - sr.save(system.trajectory(), traj_filename, format=["DCD"]) - self._num_frames = system.num_frames() - - # Now remove the chunked trajectory files. - for chunk in traj_chunks: - _Path(chunk).unlink() - - # Add config and lambda value to the system properties. - self._system.add_shared_property( - "config", self._config.as_dict(sire_compatible=True) - ) - self._system.add_shared_property("lambda", self._lambda_val) - - # Save the final system to checkpoint file. - sr.stream.save(self._system, checkpoint_file) - df = self._system.energy_trajectory(to_alchemlyb=True, energy_unit="kT") - - return df - - def get_timing(self): - return self._dyn.time_speed() - - def _cleanup(self): - del self._dyn - - def _save_energy_components(self): - - from copy import deepcopy - import openmm - - # Get the current context and system. - context = self._dyn._d._omm_mols - system = deepcopy(context.getSystem()) - - # Add each force to a unique group. - for i, f in enumerate(system.getForces()): - f.setForceGroup(i) - - # Create a new context. - new_context = openmm.Context(system, deepcopy(context.getIntegrator())) - new_context.setPositions(context.getState(getPositions=True).getPositions()) - - header = f"{'# Sample':>10}" - record = f"{self._nrg_sample:>10}" - - # Process the records. - for i, f in enumerate(system.getForces()): - state = new_context.getState(getEnergy=True, groups={i}) - header += f"{f.getName():>25}" - record += f"{state.getPotentialEnergy().value_in_unit(openmm.unit.kilocalories_per_mole):>25.2f}" - - # Write to file. - if self._nrg_sample == 0: - with open( - self._config.output_directory / self._filenames["energy_components"], - "w", - ) as f: - f.write(header + "\n") - f.write(record + "\n") - else: - with open( - self._config.output_directory / self._filenames["energy_components"], - "a", - ) as f: - f.write(record + "\n") - - # Increment the sample number. - self._nrg_sample += 1 diff --git a/src/somd2/runner/_repex.py b/src/somd2/runner/_repex.py new file mode 100644 index 0000000..5d1bd03 --- /dev/null +++ b/src/somd2/runner/_repex.py @@ -0,0 +1,609 @@ +###################################################################### +# SOMD2: GPU accelerated alchemical free-energy engine. +# +# Copyright: 2023-2024 +# +# Authors: The OpenBioSim Team +# +# SOMD2 is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# SOMD2 is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with SOMD2. If not, see . +##################################################################### + +__all__ = ["RepexRunner"] + +from numba import njit as _njit + +import numpy as _np + +import sire as _sr + +from somd2 import _logger + +from .._utils import _lam_sym + +from ._base import RunnerBase as _RunnerBase + + +class DynamicsCache: + """ + A class for caching dynamics objects. + """ + + def __init__(self, system, lambdas, num_gpus, dynamics_kwargs): + """ + Constructor. + + Parameters + ---------- + + system: :class: `System `, List[:class: `System `] + The perturbable system, or systems, to be simulated. + + lambdas: np.ndarray + The lambda value for each replica. + + num_gpus: int + The number of GPUs to use. + + dynamics_kwargs: dict + A dictionary of default dynamics keyword arguments. + """ + # Initialise attributes. + self._dynamics = [] + self._lambdas = lambdas + self._states = _np.array(range(len(lambdas))) + self._omm_states = [None] * len(lambdas) + self._omm_volumes = [None] * len(lambdas) + + # Copy the dynamics keyword arguments. + dynamics_kwargs = dynamics_kwargs.copy() + + # Overload the platform. + dynamics_kwargs["platform"] = "cuda" + + # Create the dynamics objects in serial. + for i, lam in enumerate(lambdas): + # Work out the device index. + device = i % num_gpus + + # This is a restart, get the system for this replica. + if isinstance(system, list): + mols = system[i] + # This is a new simulation. + else: + mols = system + + # Overload the device. + dynamics_kwargs["device"] = device + + # Create the dynamics object. + try: + dynamics = mols.dynamics(**dynamics_kwargs) + except Exception as e: + _logger.error( + f"Could not create dynamics object for lambda {lam:.5f}: {e}" + ) + + # Append the dynamics object. + self._dynamics.append(dynamics) + + _logger.info(f"Created dynamics object for lambda {lam:.5f}") + + def get(self, index): + """ + Get the dynamics object for a given index. + + Parameters + ---------- + + index: int + The index of the replica. + + Returns + ------- + + tuple + The dynamics object for the replica. + """ + return self._dynamics[index] + + def save_openmm_state(self, index): + """ + Save the state of the dynamics object. + + Parameters + ---------- + + index: int + The index of the replica. + """ + from openmm.unit import angstrom + + # Get the current OpenMM state. + state = self._dynamics[index]._d._omm_mols.getState( + getPositions=True, getVelocities=True + ) + + # Store the state. + self._omm_states[index] = state + + # Store the volume. + self._omm_volumes[index] = state.getPeriodicBoxVolume().value_in_unit( + angstrom**3 + ) + + def get_states(self): + """ + Get the states of the dynamics objects. + + Returns + ------- + + np.ndarray + The states. + """ + return self._states.copy() + + def set_states(self, states): + """ + Set the states of the dynamics objects. + + Parameters + ---------- + + states: np.ndarray + The new states. + """ + self._old_states = self._states + self._states = states + + def mix_states(self): + """ + Mix the states of the dynamics objects. + """ + # Mix the states. + for i, (old_state, new_state) in enumerate(zip(self._old_states, self._states)): + # The state has changed. + if old_state != new_state: + positions, velocities = self._omm_states[new_state] + self._dynamics[i]._d._omm_mols.setStates(self._omm_states[new_state]) + + +class RepexRunner(_RunnerBase): + """ + A class for running replica exchange simulations. + """ + + def __init__(self, system, config): + """ + Constructor. + + Parameters + ---------- + + system: str, :class: `System ` + The perturbable system to be simulated. This can be either a path + to a stream file, or a Sire system object. + + config: :class: `Config ` + The configuration options for the simulation. + """ + + # Call the base class constructor. + super().__init__(system, config) + + # Get the number of available GPUs. + gpu_devices = self._get_gpu_devices("cuda") + + # We can only use replica exchange if we have a GPU. + if len(gpu_devices) == 0: + _logger.error("No GPUs available. Cannot run replica exchange.") + + # Set the number of GPUs. + if self._config.max_gpus is None: + self._num_gpus = len(gpu_devices) + else: + self._num_gpus = min(self._config.max_gpus, len(gpu_devices)) + + # Create the dynamics cache. + self._dynamics_cache = DynamicsCache( + self._system, + self._lambda_values, + self._num_gpus, + self._dynamics_kwargs, + ) + + # Conversion factor for reduced potential. + kT = (_sr.units.k_boltz * self._config.temperature).to(_sr.units.kcal_per_mol) + self._beta = 1.0 / kT + + # Store the pressure times Avaogadro's number. + NA = 6.02214076e23 / _sr.units.mole + self._pressure = (self._config.pressure * NA).value() + + # If restarting, subtract the time already run from the total runtime + if self._config.restart: + self._config.runtime = str(self._config.runtime - self._system.time()) + + # Work out the current block number. + self._start_block = int( + round( + self._system.time().value() + / self._config.checkpoint_frequency.value(), + 12, + ) + ) + else: + self._start_block = 0 + + def __str__(self): + """Return a string representation of the object.""" + return f"RepexRunner(system={self._system}, config={self._config})" + + def __repr__(self): + """Return a string representation of the object.""" + return self.__str__() + + def run(self): + """ + Run the replica exchange simulation. + """ + + from math import ceil + + from concurrent.futures import ThreadPoolExecutor as _ThreadPoolExecutor + from itertools import repeat as _repeat + + # Work out the number of repex cycles. + cycles = ceil(self._config.runtime / self._config.energy_frequency) + + if self._config.checkpoint_frequency.value() > 0.0: + # Calculate the number of blocks and the remainder time. + frac = ( + self._config.runtime.value() / self._config.checkpoint_frequency.value() + ) + + # Handle the case where the runtime is less than the checkpoint frequency. + if frac < 1.0: + frac = 1.0 + self._config.checkpoint_frequency = str(self._config.runtime) + + num_blocks = int(frac) + rem = frac - num_blocks + + # Work out the number of repex cycles per block. + frac = ( + self._config.checkpoint_frequency.value() + / self._config.energy_frequency.value() + ) + + # Handle the case where the checkpoint frequency is less than the energy frequency. + if frac < 1.0: + frac = 1.0 + self._config.checkpoint_frequency = str(self._config.energy_frequency) + + # Store the number of repex cycles per block. + cycles_per_checkpoint = int(frac) + + # Otherwise, we don't checkpoint. + else: + cycles_per_checkpoint = cycles + num_blocks = 1 + rem = 0 + + # Work out the required number of executors. + executors = ceil( + self._config.num_lambda + / (self._num_gpus * self._config.oversubscription_factor) + ) + + # Create the replica list. + replica_list = list(range(self._config.num_lambda)) + + # Minimise at each lambda value. + if self._config.minimise: + with _ThreadPoolExecutor(max_workers=self._num_gpus) as executor: + try: + for result, index, exception in executor.map( + self._minimise, replica_list + ): + if not result: + _logger.error( + f"Minimisation failed for {_lam_sym} = {self._lambda_values[index]:.5f}: {exception}" + ) + raise exception + except KeyboardInterrupt: + _logger.error("Minimisation cancelled. Exiting.") + exit(1) + + # Current block number. + block = 0 + + # Perform the replica exchange simulation. + for i in range(cycles): + _logger.info(f"Running dynamics for cycle {i+1} of {cycles}") + _logger.info(f"States: {self._dynamics_cache.get_states()}") + + # Clear the results list. + results = [] + + # Whether to checkpoint. + is_checkpoint = i > 0 and i % cycles_per_checkpoint == 0 + + # Run a dynamics block for each replica, making sure only each GPU is only + # oversubscribed by a factor of self._config.oversubscription_factor. + for j in range(executors): + replicas = replica_list[ + j + * self._num_gpus + * self._config.oversubscription_factor : (j + 1) + * self._num_gpus + * self._config.oversubscription_factor + ] + with _ThreadPoolExecutor( + max_workers=self._num_gpus * self._config.oversubscription_factor + ) as executor: + try: + for result, index, energies in executor.map( + self._run_block, + replicas, + _repeat(self._lambda_values), + _repeat(is_checkpoint), + _repeat(i == cycles - 1), + _repeat(block), + _repeat(num_blocks + int(rem > 0)), + ): + if not result: + _logger.error( + f"Dynamics failed for {_lam_sym} = {self._lambda_values[index]:.5f}: {energies}" + ) + raise energies + results.append((index, energies)) + except KeyboardInterrupt: + _logger.error("Dynamics cancelled. Exiting.") + exit(1) + + if i < cycles - 1: + # Assemble and energy matrix from the results. + _logger.info("Assembling energy matrix") + energy_matrix = self._assemble_results(results) + + # Mix the replicas. + _logger.info("Mixing replicas") + self._dynamics_cache.set_states( + self._mix_replicas( + self._config.num_lambda, + energy_matrix, + self._dynamics_cache._states, + ) + ) + self._dynamics_cache.mix_states() + + # Update the block number. + if is_checkpoint: + block += 1 + + def _run_block( + self, index, lambdas, is_checkpoint, is_final_block, block, num_blocks + ): + """ + Run a dynamics block for a given replica. + + Parameters + ---------- + + index: int + The index of the replica. + + lambdas: np.ndarray + The lambda values for each replica. + + is_checkpoint: bool + Whether to checkpoint. + + is_final_block: bool + Whether this is the final block. + + block: int + The block number. + + num_blocks: int + The total number of blocks. + + Returns + ------- + + success: bool + Whether the dynamics was successful. + + index: int + The index of the replica. + + energies: np.ndarray + The energies at each lambda value. If unsuccessful, the exception + is returned. + """ + + # Get the lambda value. + lam = lambdas[index] + + _logger.info(f"Running dynamics for {_lam_sym} = {lam:.5f}") + + try: + # Get the dynamics object. + dynamics = self._dynamics_cache.get(index) + + # Run the dynamics. + dynamics.run( + self._config.energy_frequency, + energy_frequency=self._config.energy_frequency, + frame_frequency=self._config.frame_frequency, + lambda_windows=lambdas, + save_velocities=self._config.save_velocities, + auto_fix_minimise=True, + ) + + # Set the state. + self._dynamics_cache.save_openmm_state(index) + + # Get the energies at each lambda value. + energies = ( + dynamics._d.energy_trajectory() + .to_pandas(to_alchemlyb=True, energy_unit="kcal/mol") + .iloc[-1, :] + .to_numpy() + ) + + # Checkpoint. + if is_checkpoint or is_final_block: + # Commit the current system. + system = dynamics.commit() + + # Get the simulation speed. + speed = dynamics.time_speed() + + # Checkpoint. + self._checkpoint( + system, index, block, speed, is_final_block=is_final_block + ) + + _logger.info( + f"Finished block {block+1} of {self._start_block + num_blocks} " + f"for {_lam_sym} = {lam:.5f}" + ) + + if is_final_block: + _logger.success( + f"{_lam_sym} = {lam:.5f} complete, speed = {speed:.2f} ns day-1" + ) + + except Exception as e: + return False, index, e + + # Return the index and the energies. + return ( + True, + index, + energies, + ) + + def _minimise(self, index): + """ + Minimise the system. + + Parameters + ---------- + + index: int + The index of the replica. + + Returns + ------- + + success: bool + Whether the minimisation was successful. + + index: int + The index of the replica. + + exception: Exception + The exception if the minimisation failed. + """ + _logger.info(f"Minimising at {_lam_sym} = {self._lambda_values[index]:.5f}") + + try: + # Get the dynamics object. + dynamics = self._dynamics_cache.get(index) + + # Minimise the system. + dynamics.minimise(timeout=self._config.timeout) + + except Exception as e: + return False, index, e + + return True, index, None + + def _assemble_results(self, results): + """ + Assemble the results into a matrix. + + Parameters + ---------- + + results: list + The results from the repex dynamics block. + """ + # Create the matrix. + matrix = _np.zeros((len(results), len(results))) + + # Fill the matrix. + for i, energies in results: + for j, energy in enumerate(energies): + matrix[i, j] = self._beta * ( + energy + self._pressure * self._dynamics_cache._omm_volumes[i] + ) + + return matrix + + @staticmethod + @_njit + def _mix_replicas(num_replicas, energy_matrix, states): + """ + Mix the replicas. + + Parameters + ---------- + + num_replicas: int + The number of replicas. + + num_attempts: int + The number of attempts to make. + + energy_matrix: np.ndarray + The energy matrix for the replicas. + + states: np.ndarray + The current state for each replica. + + Returns + ------- + + states: np.ndarray + The new states. + """ + + for swap in range(num_replicas**3): + # Choose two replicas to swap. + replica_i = _np.random.randint(num_replicas) + replica_j = _np.random.randint(num_replicas) + + # Get the current state. + state_i = states[replica_i] + state_j = states[replica_j] + + # Get the energies. + energy_ii = energy_matrix[replica_i, state_i] + energy_jj = energy_matrix[replica_j, state_j] + energy_ij = energy_matrix[replica_i, state_j] + energy_ji = energy_matrix[replica_j, state_i] + + # Compute the log probability of the swap. + log_p_swap = -(energy_ij + energy_ji) + energy_ii + energy_jj + + # Accept or reject the swap. + if log_p_swap >= 0 or _np.random.rand() < _np.exp(log_p_swap): + states[replica_i] = state_j + states[replica_j] = state_i + + return states diff --git a/src/somd2/runner/_runner.py b/src/somd2/runner/_runner.py index de44ed6..3b48966 100644 --- a/src/somd2/runner/_runner.py +++ b/src/somd2/runner/_runner.py @@ -21,23 +21,16 @@ __all__ = ["Runner"] -from sire import morph as _morph -from sire import stream as _stream -from sire.system import System as _System - -from ..config import Config as _Config -from ..io import dataframe_to_parquet as _dataframe_to_parquet -from ..io import dict_to_yaml as _dict_to_yaml - from somd2 import _logger from .._utils import _lam_sym +from ._base import RunnerBase as _RunnerBase -class Runner: + +class Runner(_RunnerBase): """ - Controls the initiation of simulations as well as the assigning of - resources. + Standard simulation runner class. (Uncoupled simulations.) """ from multiprocessing import Manager @@ -61,216 +54,14 @@ def __init__(self, system, config): The configuration options for the simulation. """ - if not isinstance(system, (str, _System)): - raise TypeError("'system' must be of type 'str' or 'sire.system.System'") - - if isinstance(system, str): - # Try to load the stream file. - try: - self._system = _stream.load(system) - except: - raise IOError(f"Unable to load system from stream file: '{system}'") - else: - self._system = system - - # Validate the configuration. - if not isinstance(config, _Config): - raise TypeError("'config' must be of type 'somd2.config.Config'") - self._config = config - self._config._extra_args = {} - - # Log the versions of somd2 and sire. - from somd2 import __version__, _sire_version, _sire_revisionid - - _logger.info(f"somd2 version: {__version__}") - _logger.info(f"sire version: {_sire_version}+{_sire_revisionid}") - - # Check whether we need to apply a perturbation to the reference system. - if self._config.pert_file is not None: - _logger.info( - f"Applying perturbation to reference system: {self._config.pert_file}" - ) - try: - from .._utils._somd1 import _apply_pert - - self._system = _apply_pert(self._system, self._config.pert_file) - except Exception as e: - raise IOError(f"Unable to apply perturbation to reference system: {e}") - - # Make sure the system contains perturbable molecules. - try: - self._system.molecules("property is_perturbable") - except KeyError: - raise KeyError("No perturbable molecules in the system") - - # Link properties to the lambda = 0 end state. - self._system = _morph.link_to_reference(self._system) - - # Set the default configuration options. - - # Restrict the atomic properties used to define light atoms when - # applying constraints. - self._config._extra_args["check_for_h_by_max_mass"] = True - self._config._extra_args["check_for_h_by_mass"] = False - self._config._extra_args["check_for_h_by_element"] = False - self._config._extra_args["check_for_h_by_ambertype"] = False - - # Make sure that perturbable LJ sigmas aren't scaled to zero. - self._config._extra_args["fix_perturbable_zero_sigmas"] = True - - # We're running in SOMD1 compatibility mode. - if self._config.somd1_compatibility: - from .._utils._somd1 import _make_compatible - - # First, try to make the perturbation SOMD1 compatible. - - _logger.info("Applying SOMD1 perturbation compatibility.") - self._system = _make_compatible(self._system) - self._system = _morph.link_to_reference(self._system) - - # Next, swap the water topology so that it is in AMBER format. - - try: - waters = self._system["water"] - except: - waters = [] - - if len(waters) > 0: - from sire.legacy.IO import isAmberWater as _isAmberWater - from sire.legacy.IO import setAmberWater as _setAmberWater - - if not _isAmberWater(waters[0]): - num_atoms = waters[0].num_atoms() - - if num_atoms == 3: - # Here we assume tip3p no SPC/E. - model = "tip3p" - elif num_atoms == 4: - model = "tip4p" - elif num_atoms == 5: - model = "tip5p" - - try: - self._system = _System( - _setAmberWater(self._system._system, model) - ) - _logger.info( - "Converting water topology to AMBER format for SOMD1 compatibility." - ) - except: - _logger.error( - "Unable to convert water topology to AMBER format for SOMD1 compatibility." - ) - - # Ghost atoms are considered light when adding bond constraints. - self._config._extra_args["ghosts_are_light"] = True - - # Apply Boresch modifications to bonded terms involving ghost atoms to - # avoid spurious couplings to the physical system at the end states. - else: - from .._utils._ghosts import _boresch - - self._system = _boresch(self._system) - - # Check for a periodic space. - self._check_space() - - # Check the end state contraints. - self._check_end_state_constraints() - - # Get the charge difference between the two end states. - charge_diff = self._get_charge_difference(self._system) - - # Make sure the difference is integer valued to 5 decimal places. - if not round(charge_diff, 4).is_integer(): - _logger.warning("Charge difference between end states is not an integer.") - charge_diff = int(round(charge_diff, 4)) - - # Make sure the charge difference matches the expected value - # from the config. - if ( - self._config.charge_difference is not None - and self._config.charge_difference != charge_diff - ): - _logger.warning( - f"The charge difference of {charge_diff} between the end states " - f"does not match the specified value of {self._config.charge_difference}" - ) - # The user value takes precedence. - charge_diff = self._config.charge_difference - _logger.info( - f"Using user-specified value of {self._config.charge_difference}" - ) - else: - # Report that the charge will automatically be held constant. - if charge_diff != 0 and self._config.charge_difference is None: - _logger.info( - f"There is a charge difference of {charge_diff} between the end states. " - f"Adding alchemical ions to keep the charge constant." - ) - - # Create alchemical ions. - if charge_diff != 0: - self._system = self._create_alchemical_ions(self._system, charge_diff) + # Call the base class constructor. + super().__init__(system, config) - # Set the lambda values. - if self._config.lambda_values: - self._lambda_values = self._config.lambda_values - else: - self._lambda_values = [ - round(i / (self._config.num_lambda - 1), 5) - for i in range(0, self._config.num_lambda) - ] - - # Set the lambda energy list. + # Store the array of lambda values for energy sampling. if self._config.lambda_energy is not None: - self._lambda_energy = self._config.lambda_energy + self._lambda_energy = self._config.lambda_energy.copy() else: - self._lambda_energy = self._lambda_values - - # Work out the current hydrogen mass factor. - h_mass_factor, has_hydrogen = self._get_h_mass_factor(self._system) - - # HMR has already been applied. - from math import isclose - - if has_hydrogen: - if not isclose(h_mass_factor, 1.0, abs_tol=1e-4): - _logger.info( - f"Detected existing hydrogen mass repartioning factor of {h_mass_factor:.3f}." - ) - - if not isclose(h_mass_factor, self._config.h_mass_factor, abs_tol=1e-4): - new_factor = self._config.h_mass_factor / h_mass_factor - _logger.warning( - f"Existing hydrogen mass repartitioning factor of {h_mass_factor:.3f} " - f"does not match the requested value of {self._config.h_mass_factor:.3f}. " - f"Applying new factor of {new_factor:.3f}." - ) - self._system = self._repartition_h_mass(self._system, new_factor) - - else: - self._system = self._repartition_h_mass( - self._system, self._config.h_mass_factor - ) - - # Flag whether this is a GPU simulation. - self._is_gpu = self._config.platform in ["cuda", "opencl", "hip"] - - # Need to verify before doing any directory checks - if self._config.restart: - self._verify_restart_config() - - # Check the output directories and create names of output files. - self._check_directory() - - # Save config whenever 'configure' is called to keep it up to date - if self._config.write_config: - _dict_to_yaml( - self._config.as_dict(), - self._config.output_directory, - self._fnames[self._lambda_values[0]]["config"], - ) + self._lambda_energy = self._lambda_values.copy() def __str__(self): """Return a string representation of the object.""" @@ -299,923 +90,565 @@ def _create_shared_resources(self): ) ) - def _check_space(self): - """ - Check if the system has a periodic space. - """ - if ( - self._system.has_property("space") - and self._system.property("space").is_periodic() - ): - self._has_space = True - else: - self._has_space = False - _logger.info("No periodic space detected. Assuming vacuum simulation.") - if self._config.cutoff_type == "pme": - _logger.info( - "Cannot use PME for non-periodic simulations. Using RF cutoff instead." - ) - self._config.cutoff_type = "rf" - - def _check_end_state_constraints(self): - """ - Internal function to check whether the constrants are the same at the two - end states. - """ - - # Find all perturbable molecules in the system.. - pert_mols = self._system.molecules("property is_perturbable") - - # Check constraints at lambda = 0 and lambda = 1 for each perturbable molecule. - for mol in pert_mols: - # Create a dynamics object. - d = mol.dynamics( - constraint=self._config.constraint, - perturbable_constraint=self._config.perturbable_constraint, - platform="cpu", - map=self._config._extra_args, - ) - - # Get the constraints at lambda = 0. - constraints0 = d.get_constraints() - - # Update to lambda = 1. - d.set_lambda(1) - - # Get the constraints at lambda = 1. - constraints1 = d.get_constraints() - - # Check for equivalence. - if len(constraints0) != len(constraints1): - _logger.info( - f"Constraints are at not the same at {_lam_sym} = 0 and {_lam_sym} = 1." - ) - else: - for c0, c1 in zip(constraints0, constraints1): - if c0 != c1: - _logger.info( - f"Constraints are at not the same at {_lam_sym} = 0 and {_lam_sym} = 1." - ) - break - - @staticmethod - def _get_charge_difference(system): + def _update_gpu_pool(self, gpu_num): """ - Internal function to check the charge difference between the two end states. + Updates the GPU pool to remove the GPU that has been assigned to a worker. Parameters ---------- - system: :class: `System ` - The system to be perturbed. - - Returns - ------- - - charge_diff: int - The charge difference between the perturbed and reference states. + gpu_num: str + The GPU number to be added to the pool. """ + self._gpu_pool.append(gpu_num) - reference = _morph.link_to_reference(system).charge().value() - perturbed = _morph.link_to_perturbed(system).charge().value() - - return perturbed - reference - - @staticmethod - def _create_alchemical_ions(system, charge_diff): + def _remove_gpu_from_pool(self, gpu_num): """ - Internal function to create alchemical ions to maintain a constant charge. + Removes a GPU from the GPU pool. Parameters ---------- - system: :class: `System ` - The system to be perturbed. + gpu_num: str + The GPU number to be removed from the pool. + """ + self._gpu_pool.remove(gpu_num) - charge_diff: int - The charge difference between perturbed and reference states. + @staticmethod + def _zero_gpu_devices(devices): + """ + Set all device numbers relative to the lowest (the device number becomes + equal to its index in the list). Returns ------- - system: :class: `System ` - The perturbed system with alchemical ions added. + devices : [int] + List of zeroed available device numbers. """ + return [str(devices.index(value)) for value in devices] - from sire.legacy.IO import createChlorineIon as _createChlorineIon - from sire.legacy.IO import createSodiumIon as _createSodiumIon - - # Clone the system. - system = system.clone() - - # The number of waters to convert is the absolute charge difference. - num_waters = abs(charge_diff) + def run(self): + """ + Use concurrent.futures to run lambda windows in parallel - # Make sure there are enough waters to convert. The charge difference should - # never be this large, but it prevents a crash if it is. - if num_waters > len(system["water"].molecules()): - raise ValueError( - f"Insufficient waters to convert to ions. {num_waters} required, " - f"{len(system['water'].molecules())} available." - ) + Returns + -------- - # Reference coordinates. - coords = system.molecules("property is_perturbable").coordinates() - coord_string = f"{coords[0].value()}, {coords[1].value()}, {coords[2].value()}" - - # Find the furthest N waters from the perturbable molecule. - waters = system[f"furthest {num_waters} waters from {coord_string}"].molecules() - - # Determine the water model. - if waters[0].num_atoms() == 3: - model = "tip3p" - elif waters[0].num_atoms() == 4: - model = "tip4p" - elif waters[0].num_atoms() == 5: - # Note that AMBER has no ion model for tip5p. - model = "tip4p" - - # Store the molecule numbers for the system. - numbers = system.numbers() - - # Create the ions. - for water in waters: - # Flag to indicate whether we need to reverse the alchemical ion - # perturbation, i.e. ion to water, rather than water to ion. - is_reverse = False - - # Create an ion to keep the charge constant throughout the - # perturbation. - if charge_diff > 0: - # Try to find a free chlorine ion so that we match parameters. - try: - has_ion = False - ions = system["element Cl"].molecules() - for ion in ions: - if ion.num_atoms() == 1: - has_ion = True - _logger.debug("Found Cl- ion in system.") - break - - # If there isn't an ion, then try searching for a free sodium ion. - if not has_ion: - ions = system["element Na"].molecules() - for ion in ions: - if ion.num_atoms() == 1: - has_ion = True - is_reverse = True - _logger.debug("Found Na+ ion in system.") - break - - # If not found, create one using a template. - if not has_ion: - _logger.debug(f"Creating Cl- ion from {model} water template.") - ion = _createChlorineIon( - water["element O"].coordinates(), model - ) + results : [bool] + List of simulation results. (Currently whether the simulation finished + successfully or not.) + """ + results = self._manager.list() - # If not found, create one using a template. - except: - _logger.debug(f"Creating Cl- ion from {model} water template.") - ion = _createChlorineIon(water["element O"].coordinates(), model) + # Create shared resources. + self._create_shared_resources() - # Create the ion string. - if is_reverse: - ion_str = "Na+" - else: - ion_str = "Cl-" + # Work out the number of workers and the resources for each. + # CPU platform. + if self._config.platform == "cpu": + # If the number of lambda windows is greater than the number of threads, then + # the number of threads per worker is set to 1. + if self._config.num_lambda > self._config.max_threads: + self.max_workers = self._config.max_threads + threads_per_worker = 1 + # Otherwise, divide the number of threads equally between workers. else: - # Try to find a free sodium ion so that we match parameters. - try: - has_ion = False - ions = system["element Na"].molecules() - for ion in ions: - if ion.num_atoms() == 1: - has_ion = True - _logger.debug("Found Na+ ion in system.") - break - - # If there isn't an ion, then try searching for a free chlorine ion. - if not has_ion: - ions = system["element Cl"].molecules() - for ion in ions: - if ion.num_atoms() == 1: - has_ion = True - is_reverse = True - _logger.debug("Found Cl- ion in system.") - break - - # If not found, create one using a template. - if not has_ion: - _logger.debug(f"Creating Na+ ion from {model} water template.") - ion = _createSodiumIon(water["element O"].coordinates(), model) - - # If not found, create one using a template. - except: - _logger.debug(f"Creating Na+ ion from {model} water template.") - ion = _createSodiumIon(water["element O"].coordinates(), model) - - # Create the ion string. - if is_reverse: - ion_str = "Cl-" - else: - ion_str = "Na+" - - # Create an alchemical ion: ion --> water. - if is_reverse: - merged = _morph.merge(ion, water, map={"as_new_molecule": False}) - # Create an alchemical ion: water --> ion. - else: - merged = _morph.merge(water, ion, map={"as_new_molecule": False}) - - # Update the system. - system.update(merged) + self.max_workers = self._config.num_lambda + threads_per_worker = self._config.max_threads // self._config.num_lambda + self._config._extra_args["threads"] = threads_per_worker - # Get the index of the perturbed water. - index = numbers.index(water.number()) - - # Log that we are adding an alchemical ion. - if is_reverse: - _logger.info( - f"Water at molecule index {index} will be perturbed from a " - f"{ion_str} ion to keep charge constant." - ) - else: - _logger.info( - f"Water at molecule index {index} will be perturbed to a " - f"{ion_str} ion to keep charge constant." - ) + # GPU platform. + elif self._is_gpu: + self.max_workers = len(self._gpu_pool) - return system + # All other platforms. + else: + self._max_workers = 1 - def _check_directory(self): - """ - Find the name of the file from which simulations will be started. - Paired with the 'restart' option. - """ - from pathlib import Path as __Path - from ._dynamics import Dynamics - - fnames = {} - deleted = [] - for lambda_value in self._lambda_values: - files = Dynamics.create_filenames( - self._lambda_values, - lambda_value, - self._config.output_directory, - self._config.restart, - ) - fnames[lambda_value] = files - if not self._config.restart: - for file in files.values(): - fullpath = self._config.output_directory / file - if __Path.exists(fullpath): - deleted.append(fullpath) - if len(deleted) > 0: - if not self._config.overwrite: - deleted_str = [str(file) for file in deleted] - _logger.warning( - f"The following files already exist, use --overwrite to overwrite them: {list(set((deleted_str)))} \n" - ) - exit(1) - # Loop over files to be deleted, ignoring duplicates - for file in list(set(deleted)): - file.unlink() - self._fnames = fnames + import concurrent.futures as _futures - @staticmethod - def _compare_configs(config1, config2): - if not isinstance(config1, dict): - raise TypeError("'config1' must be of type 'dict'") - if not isinstance(config2, dict): - raise TypeError("'config2' must be of type 'dict'") - - from sire.units import GeneralUnit as _GeneralUnit - - # Define the subset of settings that are allowed to change after restart - allowed_diffs = [ - "runtime", - "restart", - "minimise", - "max_threads", - "equilibration_time", - "equilibration_timestep", - "equilibration_constraints", - "energy_frequency", - "save_trajectory", - "frame_frequency", - "save_velocities", - "checkpoint_frequency", - "platform", - "max_threads", - "max_gpus", - "run_parallel", - "restart", - "save_trajectories", - "write_config", - "log_level", - "log_file", - "overwrite", - ] - for key in config1.keys(): - if key not in allowed_diffs: - # Extract the config values. - v1 = config1[key] - v2 = config2[key] - - # Convert GeneralUnits to strings for comparison. - if isinstance(v1, _GeneralUnit): - v1 = str(v1) - if isinstance(v2, _GeneralUnit): - v2 = str(v2) - - # If one is from sire and the other is not, will raise error even though they are the same - if (v1 == None and v2 == False) or (v2 == None and v1 == False): - continue - elif v1 != v2: - raise ValueError( - f"{key} has changed since the last run. This is not allowed when using the restart option." - ) + with _futures.ProcessPoolExecutor(max_workers=self.max_workers) as executor: + jobs = {} + for index, lambda_value in enumerate(self._lambda_values): + jobs[executor.submit(self.run_window, index)] = lambda_value + try: + for job in _futures.as_completed(jobs): + lambda_value = jobs[job] + try: + result = job.result() + except Exception as e: + result = False - def _verify_restart_config(self): - """ - Verify that the config file matches the config file used to create the - checkpoint file. - """ - import yaml as _yaml - - def get_last_config(output_directory): - """ - Returns the last config file in the output directory. - """ - import os as _os - - config_files = [ - file - for file in _os.listdir(output_directory) - if file.endswith(".yaml") and file.startswith("config") - ] - config_files.sort() - return config_files[-1] + _logger.error( + f"Exception raised for {_lam_sym} = {lambda_value}: {e}" + ) + with self._lock: + results.append(result) - try: - last_config_file = get_last_config(self._config.output_directory) - with open(self._config.output_directory / last_config_file) as file: - _logger.debug(f"Opening config file {last_config_file}") - self.last_config = _yaml.safe_load(file) - cfg_curr = self._config.as_dict() - except: - _logger.info( - f"""No config files found in {self._config.output_directory}, - attempting to retrieve config from lambda = 0 checkpoint file.""" - ) - try: - system_temp = _stream.load( - str(self._config.output_directory / "checkpoint_0.00000.s3") - ) - except: - expdir = self._config.output_directory / "checkpoint_0.00000.s3" - _logger.error(f"Unable to load checkpoint file from {expdir}.") - raise - else: - self.last_config = dict(system_temp.property("config")) - cfg_curr = self._config.as_dict(sire_compatible=True) - del system_temp + # Kill all current and future jobs if keyboard interrupt. + except KeyboardInterrupt: + _logger.error("Cancelling job...") + for pid in executor._processes: + executor._processes[pid].terminate() - self._compare_configs(self.last_config, cfg_curr) + return results - @staticmethod - def _systems_are_same(system0, system1): - """Check for equivalence between a pair of sire systems. + def run_window(self, index): + """ + Run a single lamdba window. Parameters ---------- - system0: sire.system.System - The first system to be compared. - system1: sire.system.System - The second system to be compared. - """ - if not isinstance(system0, _System): - raise TypeError("'system0' must be of type 'sire.system.System'") - if not isinstance(system1, _System): - raise TypeError("'system1' must be of type 'sire.system.System'") - - # Check for matching uids - if not system0._system.uid() == system1._system.uid(): - reason = "uids do not match" - return False, reason - - # Check for matching number of molecules - if not len(system0.molecules()) == len(system1.molecules()): - reason = "number of molecules do not match" - return False, reason - - # Check for matching number of residues - if not len(system0.residues()) == len(system1.residues()): - reason = "number of residues do not match" - return False, reason - - # Check for matching number of atoms - if not len(system0.atoms()) == len(system1.atoms()): - reason = "number of atoms do not match" - return False, reason - - return True, None - - def get_config(self): - """ - Returns a dictionary of configuration options. + index: int + The index of the window. Returns ------- - config: dict - Dictionary of simulation options. + success: bool + Whether the simulation was successful. """ - return self._config.as_dict() - def _update_gpu_pool(self, gpu_num): - """ - Updates the GPU pool to remove the GPU that has been assigned to a worker. + # Get the lambda value. + lambda_value = self._lambda_values[index] - Parameters - ---------- + if self._is_restart: + _logger.debug(f"Restarting {_lam_sym} = {lambda_value} from file") + system = self._system[index].clone() - gpu_num: str - The GPU number to be added to the pool. - """ - self._gpu_pool.append(gpu_num) + time = system.time() + if time > self._config.runtime - self._config.timestep: + _logger.success( + f"{_lam_sym} = {lambda_value} already complete. Skipping." + ) + return True + else: + _logger.info( + f"Restarting {_lam_sym} = {lambda_value} at time {time}, " + f"time remaining = {self._config.runtime - time}" + ) + else: + system = self._system.clone() - def _remove_gpu_from_pool(self, gpu_num): - """ - Removes a GPU from the GPU pool. + # GPU platform. + if self._is_gpu: + # Get a GPU from the pool. + with self._lock: + gpu_num = self._gpu_pool[0] + self._remove_gpu_from_pool(gpu_num) + if lambda_value is not None: + _logger.info( + f"Running {_lam_sym} = {lambda_value} on GPU {gpu_num}" + ) - Parameters - ---------- + # Run the smullation. + try: + self._run( + system, + index, + device=gpu_num, + is_restart=self._is_restart, + ) - gpu_num: str - The GPU number to be removed from the pool. - """ - self._gpu_pool.remove(gpu_num) + with self._lock: + self._update_gpu_pool(gpu_num) + except: + with self._lock: + self._update_gpu_pool(gpu_num) + raise - def _set_lambda_schedule(self, schedule): - """ - Sets the lambda schedule. + # All other platforms. + else: + _logger.info(f"Running {_lam_sym} = {lambda_value}") - Parameters - ---------- + # Run the simulation. + try: + self._run(system, index, is_restart=self._is_restart) + except: + raise - schedule: sr.cas.LambdaSchedule - Lambda schedule to be set. - """ - self._config.lambda_schedule = schedule + return True - @staticmethod - def _get_gpu_devices(platform): + def _run( + self, system, index, device=None, lambda_minimisation=None, is_restart=False + ): """ - Get list of available GPUs from CUDA_VISIBLE_DEVICES, - OPENCL_VISIBLE_DEVICES, or HIP_VISIBLE_DEVICES. + Run the simulation with bookkeeping. Parameters ---------- - platform: str - The GPU platform to be used for simulations. - - Returns - -------- - - available_devices : [int] - List of available device numbers. - """ - - if not isinstance(platform, str): - raise TypeError("'platform' must be of type 'str'") - - platform = platform.lower().replace(" ", "") - - if platform not in ["cuda", "opencl", "hip"]: - raise ValueError("'platform' must be one of 'cuda', 'opencl', or 'hip'.") + system: :class: `System ` + The system to be simulated. - import os as _os + index: int + The index of the lambda window. - if platform == "cuda": - if _os.environ.get("CUDA_VISIBLE_DEVICES") is None: - raise ValueError("CUDA_VISIBLE_DEVICES not set") - else: - available_devices = _os.environ.get("CUDA_VISIBLE_DEVICES").split(",") - _logger.info(f"CUDA_VISIBLE_DEVICES set to {available_devices}") - elif platform == "opencl": - if _os.environ.get("OPENCL_VISIBLE_DEVICES") is None: - raise ValueError("OPENCL_VISIBLE_DEVICES not set") - else: - available_devices = _os.environ.get("OPENCL_VISIBLE_DEVICES").split(",") - _logger.info(f"OPENCL_VISIBLE_DEVICES set to {available_devices}") - elif platform == "hip": - if _os.environ.get("HIP_VISIBLE_DEVICES") is None: - raise ValueError("HIP_VISIBLE_DEVICES not set") - else: - available_devices = _os.environ.get("HIP_VISIBLE_DEVICES").split(",") - _logger.info(f"HIP_VISIBLE_DEVICES set to {available_devices}") + device: int + The GPU device number to be used for the simulation. - num_gpus = len(available_devices) - _logger.info(f"Number of GPUs available: {num_gpus}") + lambda_minimisation: float + The lambda value for the minimisation. - return available_devices - - @staticmethod - def _zero_gpu_devices(devices): - """ - Set all device numbers relative to the lowest - (the device number becomes equal to its index in the list). + is_restart: bool + Whether this is a restart simulation. Returns ------- - devices : [int] - List of zeroed available device numbers. + df : pandas dataframe + Dataframe containing the sire energy trajectory. """ - return [str(devices.index(value)) for value in devices] - @staticmethod - def _get_h_mass_factor(system): - """ - Get the current hydrogen mass factor. + # Get the lambda value. + lambda_value = self._lambda_values[index] - Parameters - ---------- + # Check for completion if this is a restart. + if is_restart: + time = system.time() + if time > self._config.runtime - self._config.timestep: + _logger.success( + f"{_lam_sym} = {lambda_value} already complete. Skipping." + ) + return - system : :class: `System ` - The system of interest. - """ + # Work out the current block number. + self._start_block = int( + round(time.value() / self._config.checkpoint_frequency.value(), 12) + ) - from sire.mol import Element + # Subtract the current time from the runtime. + time = self._config.runtime - time + else: + self._start_block = 0 + time = self._config.runtime + + def generate_lam_vals(lambda_base, increment=0.001): + """Generate lambda values for a given lambda_base and increment""" + if lambda_base + increment > 1.0 and lambda_base - increment < 0.0: + raise ValueError("Increment too large") + if lambda_base + increment > 1.0: + lam_vals = [lambda_base - increment] + elif lambda_base - increment < 0.0: + lam_vals = [lambda_base + increment] + else: + lam_vals = [lambda_base - increment, lambda_base + increment] + return lam_vals + + # Minimisation. + if self._config.minimise: + # Minimise with no constraints if we need to equilibrate first. + # This seems to improve the stability of the equilibration. + if self._config.equilibration_time.value() > 0.0 and not is_restart: + constraint = "none" + perturbable_constraint = "none" + else: + constraint = self._config.constraint + perturbable_constraint = self._config.perturbable_constraint - # Store the expected hydrogen mass. - expected_h_mass = Element("H").mass().value() + system = self._minimisation( + system, + lambda_value, + device=device, + constraint=constraint, + perturbable_constraint=perturbable_constraint, + ) - # Get the mass of the first hydrogen atom. - try: - h_mass = system["element H"][0].mass() - except: - return expected_h_mass, False + # Equilibration. + if self._config.equilibration_time.value() > 0.0 and not is_restart: + # Run without saving energies or frames. + + # Copy the dynamics kwargs. + dynamics_kwargs = self._dynamics_kwargs.copy() + + # Overload the dynamics kwargs with the simulation options. + dynamics_kwargs.update( + { + "device": device, + "lambda_value": lambda_value, + "constraint": ( + "none" + if not self._equilibration_constraints + else self._config.constraint + ), + "perturbable_constraint": ( + "none" + if not self._equilibration_constraints + else self._config.perturbable_constraint + ), + "save_velocities": False, + "auto_fix_minimise": False, + } + ) - # Work out the current hydrogen mass factor. We round to 3dp due to - # the precision of atomic masses loaded from text files. - return round(h_mass.value() / expected_h_mass, 3), True + # Create the dynamics object. + dynamics = system.dynamics(**dynamics_kwargs) - @staticmethod - def _repartition_h_mass(system, factor=1.0): - """ - Repartition hydrogen masses. + # Run without saving energies or frames. + dynamics.run( + self._config.equilibration_time, + energy_frequency=0, + frame_frequency=0, + ) - Parameters - ---------- + # Commit the system. + system = dynamics.commit() + + # Reset the timer to zero + system.set_time(_sr.u("0ps")) + + # Perform minimisation at the end of equilibration only if the + # timestep is increasing, or the constraint is changing. + if (self._config.timestep > self._config.equilibration_timestep) or ( + not self._config.equilibration_constraints + and self._config.perturbable_constraint != "none" + ): + self._minimisation( + system, + lambda_min=lambda_value, + device=device, + constraint=self._config.constraint, + perturbable_constraint=self._config.perturbable_constraint, + ) - system : :class: `System ` - The system to be repartitioned. + # Work out the lambda values for finite-difference gradient analysis. + lambda_grad = generate_lam_vals(lambda_value) - factor :float - The factor by which hydrogen masses will be scaled. + # Create the array of lambda values for energy sampling. + lambda_energy = self._lambda_energy.copy() - Returns - ------- + # If missing, add the lambda value. + if lambda_value not in self._lambda_energy: + lambda_energy.append(lambda_value) - system : :class: `System ` - The repartitioned system. - """ + # Sort the lambda values. + lambda_energy = sorted(lambda_energy) - if not isinstance(factor, float): - raise TypeError("'factor' must be of type 'float'") + # Create the lambda array. + lambda_array = lambda_energy.copy() - from math import isclose + # Add the lambda values for finite-difference gradient analysis. + lambda_array.extend(lambda_grad) - # Early exit if no repartitioning is required. - if isclose(factor, 1.0, abs_tol=1e-4): - return system + # Sort the lambda values. + lambda_array = sorted(lambda_array) - from sire.morph import ( - repartition_hydrogen_masses as _repartition_hydrogen_masses, - ) + _logger.info(f"Running dynamics at {_lam_sym} = {lambda_value:.5f}") - _logger.info(f"Repartitioning hydrogen masses with factor {factor:.3f}") + # Copy the dynamics kwargs. + dynamics_kwargs = self._dynamics_kwargs.copy() - return _repartition_hydrogen_masses( - system, - mass_factor=factor, + # Overload the dynamics kwargs with the simulation options. + dynamics_kwargs.update( + { + "device": device, + "lambda_value": lambda_value, + } ) - def _initialise_simulation(self, system, lambda_value, device=None): - """ - Create simulation object. - - Parameters - ---------- + # Create the dynamics object. + dynamics = system.dynamics(**dynamics_kwargs) - system: :class: `System ` - The system to be simulated. + # Run the simulation, checkpointing in blocks. + if self._config.checkpoint_frequency.value() > 0.0: - lambda_value: float - The lambda value for the simulation. + # Calculate the number of blocks and the remainder time. + frac = (time / self._config.checkpoint_frequency).value() - device: int - The GPU device number to be used for the simulation. - """ - from ._dynamics import Dynamics + # Handle the case where the runtime is less than the checkpoint frequency. + if frac < 1.0: + frac = 1.0 + self._config.checkpoint_frequency = f"{time} ps" - try: - self._sim = Dynamics( - system, - lambda_val=lambda_value, - lambda_array=self._lambda_values, - lambda_energy=self._lambda_energy, - config=self._config, - device=device, - has_space=self._has_space, - ) - except: - _logger.warning(f"System creation at {_lam_sym} = {lambda_value} failed") - raise - - def _cleanup_simulation(self): - """ - Used to delete simulation objects once the required data has been extracted. - """ - self._sim._cleanup() - - def run(self): - """ - Use concurrent.futures to run lambda windows in parallel + num_blocks = int(frac) + rem = round(frac - num_blocks, 12) - Returns - -------- + # Run the dynamics in blocks. + for block in range(int(num_blocks)): + # Add the start block number. + block += self._start_block - results : [bool] - List of simulation results. (Currently whether the simulation finished - successfully or not.) - """ - results = self._manager.list() - if self._config.run_parallel and (self._config.num_lambda is not None): - # Create shared resources. - self._create_shared_resources() - - # Work out the number of workers and the resources for each. - - # CPU platform. - if self._config.platform == "cpu": - # If the number of lambda windows is greater than the number of threads, then - # the number of threads per worker is set to 1. - if self._config.num_lambda > self._config.max_threads: - self.max_workers = self._config.max_threads - threads_per_worker = 1 - # Otherwise, divide the number of threads equally between workers. - else: - self.max_workers = self._config.num_lambda - threads_per_worker = ( - self._config.max_threads // self._config.num_lambda + # Run the dynamics. + try: + dynamics.run( + self._config.checkpoint_frequency, + energy_frequency=self._config.energy_frequency, + frame_frequency=self._config.frame_frequency, + lambda_windows=lambda_array, + save_velocities=self._config.save_velocities, + auto_fix_minimise=False, ) - self._config._extra_args["threads"] = threads_per_worker + except: + raise - # (Multi-)GPU platform. - elif self._is_gpu: - self.max_workers = len(self._gpu_pool) + # Checkpoint. + try: + # Save the energy contribution for each force. + if self._config.save_energy_components: + self._save_energy_components() + + # Commit the current system. + system = dynamics.commit() + + # Get the simulation speed. + speed = dynamics.time_speed() + + # Check if this is the final block. + is_final_block = ( + block - self._start_block + ) == num_blocks - 1 and rem == 0 + + # Checkpoint. + self._checkpoint( + system, + index, + block, + speed, + lambda_energy=lambda_energy, + lambda_grad=lambda_grad, + is_final_block=is_final_block, + ) - # All other platforms. - else: - self._max_workers = 1 + _logger.info( + f"Finished block {block+1} of {self._start_block + num_blocks} " + f"for {_lam_sym} = {lambda_value:.5f}" + ) - import concurrent.futures as _futures + if is_final_block: + _logger.success( + f"{_lam_sym} = {lambda_value:.5f} complete, speed = {speed:.2f} ns day-1" + ) + except: + raise - with _futures.ProcessPoolExecutor(max_workers=self.max_workers) as executor: - jobs = {} - for lambda_value in self._lambda_values: - jobs[executor.submit(self.run_window, lambda_value)] = lambda_value - try: - for job in _futures.as_completed(jobs): - lambda_value = jobs[job] - try: - result = job.result() - except Exception as e: - result = False - - _logger.error( - f"Exception raised for {_lam_sym} = {lambda_value}: {e}" - ) - with self._lock: - results.append(result) - # Kill all current and future jobs if keyboard interrupt. - except KeyboardInterrupt: - for pid in executor._processes: - executor._processes[pid].terminate() - - # Serial configuration. - elif self._config.num_lambda is not None: - if self._config.platform == "cpu": - self._config._extra_args = {"threads": self._config.max_threads} - self._lambda_values = [ - round(i / (self._config.num_lambda - 1), 5) - for i in range(0, self._config.num_lambda) - ] - for lambda_value in self._lambda_values: + # Handle the remainder time. + if rem > 0: + block += 1 try: - result = self.run_window(lambda_value) - except Exception as e: - result = False + dynamics.run( + rem, + energy_frequency=self._config.energy_frequency, + frame_frequency=self._config.frame_frequency, + lambda_windows=lambda_array, + save_velocities=self._config.save_velocities, + auto_fix_minimise=False, + ) - _logger.error( - f"Exception raised for {_lam_sym} = {lambda_value}: {e}" + # Save the energy contribution for each force. + if self._config.save_energy_components: + self._save_energy_components() + + # Commit the current system. + system = dynamics.commit() + + # Get the simulation speed. + speed = dynamics.time_speed() + + # Checkpoint. + self._checkpoint( + system, + index, + block, + speed, + lambda_energy=lambda_energy, + lambda_grad=lambda_grad, + is_final_block=True, ) - results.append(result) + _logger.info( + f"Finished block {block+1} of {self._start_block + num_blocks} " + f"for {_lam_sym} = {lambda_value:.5f}" + ) + + _logger.success( + f"{_lam_sym} = {lambda_value:.5f} complete, speed = {speed:.2f} ns day-1" + ) + except Exception as e: + raise else: - raise ValueError( - "Vanilla MD not currently supported. Please set num_lambda > 1." - ) + try: + dynamics.run( + self._config.checkpoint_frequency, + energy_frequency=self._config.energy_frequency, + frame_frequency=self._config.frame_frequency, + lambda_windows=lam_arr, + save_velocities=self._config.save_velocities, + auto_fix_minimise=False, + ) + except: + raise - return results + # Commit the current system. + system = dynamics.commit() + + # Get the simulation speed. + speed = dynamics.time_speed() + + # Checkpoint. + self._checkpoint(system, index, 0, speed, is_final_block=True) - def run_window(self, lambda_value): + _logger.success( + f"{_lam_sym} = {lambda_value:.5f} complete, speed = {speed:.2f} ns day-1" + ) + + def _minimisation( + self, + system, + lambda_value, + device=None, + constraint="none", + perturbable_constraint="none", + ): """ - Run a single simulation. + Minimise a system. Parameters ---------- + system: str, :class: `System ` + The system to minimise. + lambda_value: float - The lambda value for the simulation. + Lambda value at which to run minimisation. - temperature: float - The temperature for the simulation. + device: int + The GPU device number to be used for the simulation. - Returns - ------- + constraint: str + The constraint for non-perturbable molecules. - result: str - The result of the simulation. + perturbable_constraint: str + The constraint for perturbable molecules. """ - def _run(sim, is_restart=False): - # This function is complex due to the mixture of options for minimisation and dynamics - if self._config.minimise: - try: - df = sim._run(is_restart=is_restart) - lambda_grad = sim._lambda_grad - speed = sim.get_timing() - return df, lambda_grad, speed - except Exception as e: - _logger.warning( - f"Minimisation/dynamics at {_lam_sym} = {lambda_value} failed with the " - f"following exception {e}, trying again with minimsation at {_lam_sym} = 0." - ) - try: - df = sim._run(lambda_minimisation=0.0, is_restart=is_restart) - lambda_grad = sim._lambda_grad - speed = sim.get_timing() - return df, lambda_grad, speed - except Exception as e: - _logger.error( - f"Minimisation/dynamics at {_lam_sym} = {lambda_value} failed, even after " - f"minimisation at {_lam_sym} = 0. The following warning was raised: {e}." - ) - raise - else: - try: - df = sim._run(is_restart) - lambda_grad = sim._lambda_grad - speed = sim.get_timing() - return df, lambda_grad, speed - except Exception as e: - _logger.error( - f"Dynamics at {_lam_sym} = {lambda_value} failed. The following warning was " - f"raised: {e}. This may be due to a lack of minimisation." - ) + _logger.info(f"Minimising at {_lam_sym} = {lambda_value:.5f}") - if self._config.restart: - _logger.debug(f"Restarting {_lam_sym} = {lambda_value} from file") - try: - system = _stream.load( - str( - self._config.output_directory - / self._fnames[lambda_value]["checkpoint"] - ) - ).clone() - except: - _logger.warning( - f"Unable to load checkpoint file for {_lam_sym}={lambda_value}, starting from scratch." - ) - system = self._system.clone() - is_restart = False - else: - aresame, reason = self._systems_are_same(self._system, system) - if not aresame: - raise ValueError( - f"Checkpoint file does not match system for the following reason: {reason}." - ) - try: - self._compare_configs( - self.last_config, dict(system.property("config")) - ) - except: - cfg_here = dict(system.property("config")) - _logger.debug( - f"last config: {self.last_config}, current config: {cfg_here}" - ) - _logger.error( - f"Config for {_lam_sym}={lambda_value} does not match previous config." - ) - raise - else: - lambda_encoded = system.property("lambda") - try: - lambda_encoded == lambda_value - except: - fname = self._fnames[lambda_value]["checkpoint"] - raise ValueError( - f"Lambda value from checkpoint file {fname} ({lambda_encoded}) does not match expected value ({lambda_value})." - ) - is_restart = True - else: - system = self._system.clone() - is_restart = False - if is_restart: - acc_time = system.time() - if acc_time > self._config.runtime - self._config.timestep: - _logger.success( - f"{_lam_sym} = {lambda_value} already complete. Skipping." - ) - return True - else: - _logger.info( - f"Restarting {_lam_sym} = {lambda_value} at time {acc_time}, time remaining = {self._config.runtime - acc_time}" - ) - # GPU platform. - if self._is_gpu: - if self._config.run_parallel: - with self._lock: - gpu_num = self._gpu_pool[0] - self._remove_gpu_from_pool(gpu_num) - if lambda_value is not None: - _logger.info( - f"Running {_lam_sym} = {lambda_value} on GPU {gpu_num}" - ) - # Assumes that device for non-parallel GPU jobs is 0 - else: - gpu_num = 0 - _logger.info(f"Running {_lam_sym} = {lambda_value} on GPU 0") - self._initialise_simulation(system, lambda_value, device=gpu_num) - try: - df, lambda_grad, speed = _run(self._sim, is_restart=is_restart) - except: - if self._config.run_parallel: - with self._lock: - self._update_gpu_pool(gpu_num) - raise - else: - if self._config.run_parallel: - with self._lock: - self._update_gpu_pool(gpu_num) - self._sim._cleanup() + # Copy the dynamics kwargs. + dynamics_kwargs = self._dynamics_kwargs.copy() - # All other platforms. - else: - _logger.info(f"Running {_lam_sym} = {lambda_value}") + # Overload the dynamics kwargs with the minimisation options. + dynamics_kwargs.update( + { + "device": device, + "lambda_value": lambda_value, + "constraint": constraint, + "perturbable_constraint": perturbable_constraint, + } + ) - self._initialise_simulation(system, lambda_value) - try: - df, lambda_grad, speed = _run(self._sim, is_restart=is_restart) - except: - raise - self._sim._cleanup() + try: + # Create a dynamics object. + dynamics = system.dynamics(**dynamics_kwargs) - from somd2 import __version__, _sire_version, _sire_revisionid + # Run the minimisation. + dynamics.minimise(timeout=self._config.timeout) - # Add the current lambda value to the list of lambda values and sort. - lambda_array = self._lambda_energy.copy() - if lambda_value not in lambda_array: - lambda_array.append(lambda_value) - lambda_array = sorted(lambda_array) + # Commit the system. + system = dynamics.commit() + except: + raise - # Write final dataframe for the system to the energy trajectory file. - # Note that sire s3 checkpoint files contain energy trajectory data, so this works even for restarts. - _ = _dataframe_to_parquet( - df, - metadata={ - "attrs": df.attrs, - "somd2 version": __version__, - "sire version": f"{_sire_version}+{_sire_revisionid}", - "lambda": str(lambda_value), - "lambda_array": lambda_array, - "lambda_grad": lambda_grad, - "speed": speed, - "temperature": str(self._config.temperature.value()), - }, - filepath=self._config.output_directory, - filename=self._fnames[lambda_value]["energy_traj"], - ) - del system - _logger.success( - f"{_lam_sym} = {lambda_value} complete, speed = {speed:.2f} ns day-1" - ) - return True + return system diff --git a/tests/io/test_io.py b/tests/io/test_io.py index f384726..56e65fa 100644 --- a/tests/io/test_io.py +++ b/tests/io/test_io.py @@ -1,6 +1,5 @@ from somd2.io import ( dataframe_to_parquet, - dict_to_yaml, parquet_append, parquet_to_dataframe, ) diff --git a/tests/runner/test_config.py b/tests/runner/test_config.py index c69b61d..cfb14be 100644 --- a/tests/runner/test_config.py +++ b/tests/runner/test_config.py @@ -15,67 +15,49 @@ def test_dynamics_options(): # Load the demo stream file. mols = sr.load(sr.expand(sr.tutorial_url, "merged_molecule.s3")) + # Create a config object. + config = Config(platform="cpu", output_directory=tmpdir) + # Instantiate a runner using the default config. # (All default options, other than platform="cpu".) - runner = Runner(mols, Config(platform="cpu", output_directory=tmpdir)) + runner = Runner(mols, config) # Initalise a fake simulation. - runner._initialise_simulation(runner._system.clone(), 0.0) - - # Setup a dynamics object for equilibration. - runner._sim._setup_dynamics(equilibration=True) - - # Store the config object. - config_inp = runner._config - - # Store the dynamics object. - d = runner._sim._dyn - - # Timestep. - assert config_inp.equilibration_timestep == d.timestep() - - # Setup a dynamics object for production. - runner._sim._setup_dynamics(equilibration=False) - - # Store the dynamics object. - d = runner._sim._dyn + d = runner._system.dynamics(**runner._dynamics_kwargs) # Timestep. - assert str(config_inp.timestep).lower() == str(d.timestep()).lower() + assert str(config.timestep).lower() == str(d.timestep()).lower() # Schedule. assert ( - config_inp.lambda_schedule.to_string().lower() + config.lambda_schedule.to_string().lower() == d.get_schedule().to_string().lower() ) # Cutoff-type. - assert config_inp.cutoff_type.lower() == d.info().cutoff_type().lower() + assert config.cutoff_type.lower() == d.info().cutoff_type().lower() # Platform. - assert config_inp.platform.lower() == d.platform().lower() + assert config.platform.lower() == d.platform().lower() # Temperature and pressure. if not d.ensemble().is_micro_canonical(): assert ( - str(config_inp.temperature).lower() + str(config.temperature).lower() == str(d.ensemble().temperature()).lower() ) - assert ( - str(config_inp.pressure).lower() == str(d.ensemble().pressure()).lower() - ) + assert str(config.pressure).lower() == str(d.ensemble().pressure()).lower() # Constraint. - assert config_inp.constraint.lower() == d.constraint().lower() + assert config.constraint.lower() == d.constraint().lower() # Perturbable_constraint. assert ( - config_inp.perturbable_constraint.lower() - == d.perturbable_constraint().lower() + config.perturbable_constraint.lower() == d.perturbable_constraint().lower() ) # Integrator. - assert config_inp.integrator.lower().replace( + assert config.integrator.lower().replace( "_", "" ) == d.integrator().__class__.__name__.lower().replace("integrator", "")