Skip to content

Commit

Permalink
Merge pull request mala-project#485 from RandomDefaultUser/add_malada…
Browse files Browse the repository at this point in the history
…_feature

Add uncorrelated snapshot parsing from MALADA to MALA
  • Loading branch information
RandomDefaultUser authored Sep 20, 2023
2 parents b64eca2 + 4bc5adc commit 22bcefb
Show file tree
Hide file tree
Showing 2 changed files with 186 additions and 6 deletions.
13 changes: 13 additions & 0 deletions mala/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -1068,6 +1068,17 @@ class ParametersDataGeneration(ParametersBase):
from the end). Usually, 10% is a fine assumption. This value usually
does not need to be changed.
trajectory_analysis_correlation_metric_cutoff : float
Cutoff value to be used when sampling uncorrelated snapshots
during trajectory analysis. If negative, a value will be determined
numerically. This value is a cutoff for the minimum euclidean distance
between any two ions in two subsequent ionic configurations.
trajectory_analysis_temperature_tolerance_percent : float
Maximum deviation of temperature between snapshot and desired
temperature for snapshot to be considered for DFT calculation
(in percent)
local_psp_path : string
Path to where the local pseudopotential is stored (for OF-DFT-MD).
Expand Down Expand Up @@ -1095,6 +1106,8 @@ def __init__(self):
self.trajectory_analysis_denoising_width = 100
self.trajectory_analysis_below_average_counter = 50
self.trajectory_analysis_estimated_equilibrium = 0.1
self.trajectory_analysis_correlation_metric_cutoff = -0.1
self.trajectory_analysis_temperature_tolerance_percent = 1
self.local_psp_path = None
self.local_psp_name = None
self.ofdft_timestep = 0
Expand Down
179 changes: 173 additions & 6 deletions mala/datageneration/trajectory_analyzer.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
"""Tools for analyzing a trajectory."""

from functools import cached_property
import os
from warnings import warn

from ase.io.trajectory import TrajectoryReader, Trajectory
from ase.io.trajectory import TrajectoryReader, Trajectory, TrajectoryWriter
import numpy as np
from scipy.spatial import distance

from mala.common.parameters import Parameters, ParametersDataGeneration
from mala.common.parameters import ParametersDataGeneration
from mala.common.parallelizer import printout
from mala.targets import Target
from mala.descriptors.descriptor import Descriptor


class TrajectoryAnalyzer:
Expand All @@ -27,26 +30,42 @@ class TrajectoryAnalyzer:
one will be generated ad-hoc (recommended).
"""

def __init__(self, parameters, trajectory, target_calculator=None):
def __init__(self, parameters, trajectory, temperatures=None,
target_calculator=None, target_temperature=None,
malada_compatability=False):
warn("The class TrajectoryAnalyzer is experimental. The algorithms "
"within have been tested, but the API may still be subject to "
"large changes.")

self.params: ParametersDataGeneration = parameters.datageneration

# Save or read the trajectory
# If needed, read the trajectory
self.trajectory = None
if isinstance(trajectory, TrajectoryReader):
self.trajectory = trajectory
elif isinstance(trajectory, str):
self.trajectory = Trajectory(trajectory)
else:
raise Exception("Incompatible trajectory format provided.")

# If needed, read the temperature files
self.temperatures = None
if temperatures is not None:
if isinstance(temperatures, np.ndarray):
self.temperatures = temperatures
elif isinstance(temperatures, str):
self.temperatures = np.load(temperatures)
else:
raise Exception("Incompatible temperature format provided.")

# Create target calculator.
self.target_calculator = target_calculator
if target_calculator is None:
self.target_calculator = Target(parameters)

if target_temperature is not None:
self.target_calculator.temperature = target_temperature

# Initialize variables.
self.distance_metrics = []
self.distance_metrics_denoised = []
Expand All @@ -55,6 +74,43 @@ def __init__(self, parameters, trajectory, target_calculator=None):
self.first_considered_snapshot = None
self.last_considered_snapshot = None

# MALADA used two times the minimum imaging convention (MIC) to
# determine the cutoff radius for the RDF.
# In the future, only the MIC radius should be used, but to
# recreate old plots, one may need to use two times the MIC.
if malada_compatability:
self.target_calculator.parameters.rdf_parameters["rMax"] = "2mic"

@property
def trajectory(self):
"""Trajectory to be analyzed."""
return self._trajectory

@trajectory.setter
def trajectory(self, new_trajectory):
self._trajectory = new_trajectory
self.uncache_properties()

def _is_property_cached(self, property_name):
return property_name in self.__dict__.keys()

def uncache_properties(self):
"""Uncache all cached properties of this calculator."""
if self._is_property_cached("first_snapshot"):
del self.first_snapshot
if self._is_property_cached("snapshot_correlation_cutoff"):
del self.snapshot_correlation_cutoff

@cached_property
def first_snapshot(self):
"""First equilibrated snapshot."""
return self.get_first_snapshot()

@cached_property
def snapshot_correlation_cutoff(self):
"""Cutoff for the snapshot correlation analysis."""
return self.get_snapshot_correlation_cutoff()

def get_first_snapshot(self, equilibrated_snapshot=None,
distance_threshold=None):
"""
Expand Down Expand Up @@ -133,9 +189,104 @@ def get_first_snapshot(self, equilibrated_snapshot=None,
first_snapshot = idx
break

print("First equilibrated timestep of trajectory is", first_snapshot)
printout("First equilibrated timestep of trajectory is", first_snapshot)
return first_snapshot

def get_snapshot_correlation_cutoff(self):
"""
Determine the cutoff for the distance metric.
If a cutoff is set by the user via the Parameters object, this
function simply returns this value. Elsewise, the value
is determined numerically. The distance metric used here is realspace
(i.e. the smallest displacement of an atom between two snapshots). The
cutoff gives a lower estimate for the oscillations of the trajectory.
Any distance above this cutoff can be attributed to the oscillations
in the trajectory. Any cutoff below is the consquence of temporal
neighborhood of these snapshots.
Returns
-------
cutoff : float
Cutoff below which two snapshots can be assumed to be similar
to each other to a degree that suggests temporal neighborhood.
"""
if self.params.trajectory_analysis_correlation_metric_cutoff < 0:
return self._analyze_distance_metric(self.trajectory)
else:
return self.params.trajectory_analysis_correlation_metric_cutoff

def get_uncorrelated_snapshots(self, filename_uncorrelated_snapshots):
"""
Calculate a set of uncorrelated configurations from a trajectory.
If not priorly determined with the object calling this function, the
first equilibrated configuration will be determined automatically.
This function will create two files, one archiving the MD step number
of all snapshots sampled, and one with the actual snapshots themselves.
Parameters
----------
filename_uncorrelated_snapshots : string
Name of the file in which to save the uncorrelated snapshots.
"""
filename_base = \
os.path.basename(filename_uncorrelated_snapshots).split(".")[0]
allowed_temp_diff_K = (self.params.
trajectory_analysis_temperature_tolerance_percent
/ 100) * self.target_calculator.temperature
current_snapshot = self.first_snapshot
begin_snapshot = self.first_snapshot+1
end_snapshot = len(self.trajectory)
j = 0
md_iteration = []
for i in range(begin_snapshot, end_snapshot):
if self.__check_if_snapshot_is_valid(self.trajectory[i],
self.temperatures[i],
self.trajectory[current_snapshot],
self.temperatures[current_snapshot],
self.snapshot_correlation_cutoff,
allowed_temp_diff_K):
current_snapshot = i
md_iteration.append(current_snapshot)
j += 1
np.random.shuffle(md_iteration)
for i in range(0, len(md_iteration)):
if i == 0:
traj_writer = TrajectoryWriter(filename_base+".traj", mode='w')
else:
traj_writer = TrajectoryWriter(filename_base+".traj", mode='a')
atoms_to_write = Descriptor.enforce_pbc(self.trajectory[md_iteration[i]])
traj_writer.write(atoms=atoms_to_write)
np.save(filename_base+"_numbers.npy", md_iteration)
printout(j, "possible snapshots found in MD trajectory.")

def _analyze_distance_metric(self, trajectory):
# distance metric usefdfor the snapshot parsing (realspace similarity
# of the snapshot), we first find the center of the equilibrated part
# of the trajectory and calculate the differences w.r.t to to it.
center = int((np.shape(self.distance_metrics_denoised)[
0] - self.first_snapshot) / 2) + self.first_snapshot
width = int(
self.params.trajectory_analysis_estimated_equilibrium *
np.shape(self.distance_metrics_denoised)[0])
self.distances_realspace = []
self.__saved_rdf = None
for i in range(center - width, center + width):
self.distances_realspace.append(
self._calculate_distance_between_snapshots(
trajectory[center], trajectory[i],
"realspace", "minimal_distance", save_rdf1=True))

# From these metrics, we assume mean - 2.576 std as limit.
# This translates to a confidence interval of ~99%, which should
# make any coincidental similarites unlikely.
cutoff = np.mean(self.distances_realspace) - 2.576 * np.std(
self.distances_realspace)
printout("Distance metric cutoff is", cutoff)
return cutoff

def _calculate_distance_between_snapshots(self, snapshot1, snapshot2,
distance_metric, reduction,
save_rdf1=False):
Expand Down Expand Up @@ -193,3 +344,19 @@ def __denoise(self, signal):
mode='same')
return denoised_signal

def __check_if_snapshot_is_valid(self, snapshot_to_test, temp_to_test,
reference_snapshot, reference_temp,
distance_metric,
allowed_temp_diff):
distance = self.\
_calculate_distance_between_snapshots(snapshot_to_test,
reference_snapshot,
"realspace",
"minimal_distance")
temp_diff = np.abs(temp_to_test-reference_temp)
if distance > distance_metric and temp_diff < allowed_temp_diff:
return True
else:
return False


0 comments on commit 22bcefb

Please sign in to comment.