diff --git a/mala/common/parameters.py b/mala/common/parameters.py index d63149193..314ea44ff 100644 --- a/mala/common/parameters.py +++ b/mala/common/parameters.py @@ -1068,6 +1068,17 @@ class ParametersDataGeneration(ParametersBase): from the end). Usually, 10% is a fine assumption. This value usually does not need to be changed. + trajectory_analysis_correlation_metric_cutoff : float + Cutoff value to be used when sampling uncorrelated snapshots + during trajectory analysis. If negative, a value will be determined + numerically. This value is a cutoff for the minimum euclidean distance + between any two ions in two subsequent ionic configurations. + + trajectory_analysis_temperature_tolerance_percent : float + Maximum deviation of temperature between snapshot and desired + temperature for snapshot to be considered for DFT calculation + (in percent) + local_psp_path : string Path to where the local pseudopotential is stored (for OF-DFT-MD). @@ -1095,6 +1106,8 @@ def __init__(self): self.trajectory_analysis_denoising_width = 100 self.trajectory_analysis_below_average_counter = 50 self.trajectory_analysis_estimated_equilibrium = 0.1 + self.trajectory_analysis_correlation_metric_cutoff = -0.1 + self.trajectory_analysis_temperature_tolerance_percent = 1 self.local_psp_path = None self.local_psp_name = None self.ofdft_timestep = 0 diff --git a/mala/datageneration/trajectory_analyzer.py b/mala/datageneration/trajectory_analyzer.py index c908f7981..548ad95c1 100644 --- a/mala/datageneration/trajectory_analyzer.py +++ b/mala/datageneration/trajectory_analyzer.py @@ -1,13 +1,16 @@ """Tools for analyzing a trajectory.""" - +from functools import cached_property +import os from warnings import warn -from ase.io.trajectory import TrajectoryReader, Trajectory +from ase.io.trajectory import TrajectoryReader, Trajectory, TrajectoryWriter import numpy as np from scipy.spatial import distance -from mala.common.parameters import Parameters, ParametersDataGeneration +from mala.common.parameters import ParametersDataGeneration +from mala.common.parallelizer import printout from mala.targets import Target +from mala.descriptors.descriptor import Descriptor class TrajectoryAnalyzer: @@ -27,14 +30,17 @@ class TrajectoryAnalyzer: one will be generated ad-hoc (recommended). """ - def __init__(self, parameters, trajectory, target_calculator=None): + def __init__(self, parameters, trajectory, temperatures=None, + target_calculator=None, target_temperature=None, + malada_compatability=False): warn("The class TrajectoryAnalyzer is experimental. The algorithms " "within have been tested, but the API may still be subject to " "large changes.") self.params: ParametersDataGeneration = parameters.datageneration - # Save or read the trajectory + # If needed, read the trajectory + self.trajectory = None if isinstance(trajectory, TrajectoryReader): self.trajectory = trajectory elif isinstance(trajectory, str): @@ -42,11 +48,24 @@ def __init__(self, parameters, trajectory, target_calculator=None): else: raise Exception("Incompatible trajectory format provided.") + # If needed, read the temperature files + self.temperatures = None + if temperatures is not None: + if isinstance(temperatures, np.ndarray): + self.temperatures = temperatures + elif isinstance(temperatures, str): + self.temperatures = np.load(temperatures) + else: + raise Exception("Incompatible temperature format provided.") + # Create target calculator. self.target_calculator = target_calculator if target_calculator is None: self.target_calculator = Target(parameters) + if target_temperature is not None: + self.target_calculator.temperature = target_temperature + # Initialize variables. self.distance_metrics = [] self.distance_metrics_denoised = [] @@ -55,6 +74,43 @@ def __init__(self, parameters, trajectory, target_calculator=None): self.first_considered_snapshot = None self.last_considered_snapshot = None + # MALADA used two times the minimum imaging convention (MIC) to + # determine the cutoff radius for the RDF. + # In the future, only the MIC radius should be used, but to + # recreate old plots, one may need to use two times the MIC. + if malada_compatability: + self.target_calculator.parameters.rdf_parameters["rMax"] = "2mic" + + @property + def trajectory(self): + """Trajectory to be analyzed.""" + return self._trajectory + + @trajectory.setter + def trajectory(self, new_trajectory): + self._trajectory = new_trajectory + self.uncache_properties() + + def _is_property_cached(self, property_name): + return property_name in self.__dict__.keys() + + def uncache_properties(self): + """Uncache all cached properties of this calculator.""" + if self._is_property_cached("first_snapshot"): + del self.first_snapshot + if self._is_property_cached("snapshot_correlation_cutoff"): + del self.snapshot_correlation_cutoff + + @cached_property + def first_snapshot(self): + """First equilibrated snapshot.""" + return self.get_first_snapshot() + + @cached_property + def snapshot_correlation_cutoff(self): + """Cutoff for the snapshot correlation analysis.""" + return self.get_snapshot_correlation_cutoff() + def get_first_snapshot(self, equilibrated_snapshot=None, distance_threshold=None): """ @@ -133,9 +189,104 @@ def get_first_snapshot(self, equilibrated_snapshot=None, first_snapshot = idx break - print("First equilibrated timestep of trajectory is", first_snapshot) + printout("First equilibrated timestep of trajectory is", first_snapshot) return first_snapshot + def get_snapshot_correlation_cutoff(self): + """ + Determine the cutoff for the distance metric. + + If a cutoff is set by the user via the Parameters object, this + function simply returns this value. Elsewise, the value + is determined numerically. The distance metric used here is realspace + (i.e. the smallest displacement of an atom between two snapshots). The + cutoff gives a lower estimate for the oscillations of the trajectory. + Any distance above this cutoff can be attributed to the oscillations + in the trajectory. Any cutoff below is the consquence of temporal + neighborhood of these snapshots. + + Returns + ------- + cutoff : float + Cutoff below which two snapshots can be assumed to be similar + to each other to a degree that suggests temporal neighborhood. + + """ + if self.params.trajectory_analysis_correlation_metric_cutoff < 0: + return self._analyze_distance_metric(self.trajectory) + else: + return self.params.trajectory_analysis_correlation_metric_cutoff + + def get_uncorrelated_snapshots(self, filename_uncorrelated_snapshots): + """ + Calculate a set of uncorrelated configurations from a trajectory. + + If not priorly determined with the object calling this function, the + first equilibrated configuration will be determined automatically. + This function will create two files, one archiving the MD step number + of all snapshots sampled, and one with the actual snapshots themselves. + + Parameters + ---------- + filename_uncorrelated_snapshots : string + Name of the file in which to save the uncorrelated snapshots. + """ + filename_base = \ + os.path.basename(filename_uncorrelated_snapshots).split(".")[0] + allowed_temp_diff_K = (self.params. + trajectory_analysis_temperature_tolerance_percent + / 100) * self.target_calculator.temperature + current_snapshot = self.first_snapshot + begin_snapshot = self.first_snapshot+1 + end_snapshot = len(self.trajectory) + j = 0 + md_iteration = [] + for i in range(begin_snapshot, end_snapshot): + if self.__check_if_snapshot_is_valid(self.trajectory[i], + self.temperatures[i], + self.trajectory[current_snapshot], + self.temperatures[current_snapshot], + self.snapshot_correlation_cutoff, + allowed_temp_diff_K): + current_snapshot = i + md_iteration.append(current_snapshot) + j += 1 + np.random.shuffle(md_iteration) + for i in range(0, len(md_iteration)): + if i == 0: + traj_writer = TrajectoryWriter(filename_base+".traj", mode='w') + else: + traj_writer = TrajectoryWriter(filename_base+".traj", mode='a') + atoms_to_write = Descriptor.enforce_pbc(self.trajectory[md_iteration[i]]) + traj_writer.write(atoms=atoms_to_write) + np.save(filename_base+"_numbers.npy", md_iteration) + printout(j, "possible snapshots found in MD trajectory.") + + def _analyze_distance_metric(self, trajectory): + # distance metric usefdfor the snapshot parsing (realspace similarity + # of the snapshot), we first find the center of the equilibrated part + # of the trajectory and calculate the differences w.r.t to to it. + center = int((np.shape(self.distance_metrics_denoised)[ + 0] - self.first_snapshot) / 2) + self.first_snapshot + width = int( + self.params.trajectory_analysis_estimated_equilibrium * + np.shape(self.distance_metrics_denoised)[0]) + self.distances_realspace = [] + self.__saved_rdf = None + for i in range(center - width, center + width): + self.distances_realspace.append( + self._calculate_distance_between_snapshots( + trajectory[center], trajectory[i], + "realspace", "minimal_distance", save_rdf1=True)) + + # From these metrics, we assume mean - 2.576 std as limit. + # This translates to a confidence interval of ~99%, which should + # make any coincidental similarites unlikely. + cutoff = np.mean(self.distances_realspace) - 2.576 * np.std( + self.distances_realspace) + printout("Distance metric cutoff is", cutoff) + return cutoff + def _calculate_distance_between_snapshots(self, snapshot1, snapshot2, distance_metric, reduction, save_rdf1=False): @@ -193,3 +344,19 @@ def __denoise(self, signal): mode='same') return denoised_signal + def __check_if_snapshot_is_valid(self, snapshot_to_test, temp_to_test, + reference_snapshot, reference_temp, + distance_metric, + allowed_temp_diff): + distance = self.\ + _calculate_distance_between_snapshots(snapshot_to_test, + reference_snapshot, + "realspace", + "minimal_distance") + temp_diff = np.abs(temp_to_test-reference_temp) + if distance > distance_metric and temp_diff < allowed_temp_diff: + return True + else: + return False + +