diff --git a/docs/source/apidoc/janus_core.rst b/docs/source/apidoc/janus_core.rst index d4060eb9..3016d686 100644 --- a/docs/source/apidoc/janus_core.rst +++ b/docs/source/apidoc/janus_core.rst @@ -164,6 +164,16 @@ janus\_core.helpers.descriptors module :undoc-members: :show-inheritance: +janus\_core.helpers.post_process module +--------------------------------------- + +.. automodule:: janus_core.helpers.post_process + :members: + :special-members: + :private-members: + :undoc-members: + :show-inheritance: + janus\_core.helpers.train module -------------------------------- diff --git a/janus_core/calculations/md.py b/janus_core/calculations/md.py index 21fdd857..38477bda 100644 --- a/janus_core/calculations/md.py +++ b/janus_core/calculations/md.py @@ -5,11 +5,11 @@ from math import isclose from pathlib import Path import random -from typing import Any, Optional +from typing import Any, Optional, Union from warnings import warn from ase import Atoms, units -from ase.io import write +from ase.io import read, write from ase.md.langevin import Langevin from ase.md.npt import NPT as ASE_NPT from ase.md.velocitydistribution import ( @@ -18,11 +18,21 @@ ZeroRotation, ) from ase.md.verlet import VelocityVerlet + +try: + from ase.geometry.analysis import Analysis + + ASE_GEOMETRY = True +except ImportError: + + ASE_GEOMETRY = False + import numpy as np from janus_core.calculations.geom_opt import optimize -from janus_core.helpers.janus_types import Ensembles, PathLike +from janus_core.helpers.janus_types import Ensembles, PathLike, PostProcessKwargs from janus_core.helpers.log import config_logger +from janus_core.helpers.post_process import compute_rdf, compute_vaf from janus_core.helpers.utils import FileNameMixin DENS_FACT = (units.m / 1.0e2) ** 3 / units.mol @@ -97,6 +107,8 @@ class MolecularDynamics(FileNameMixin): # pylint: disable=too-many-instance-att heating. temp_time : Optional[float] Time between heating steps, in fs. Default is None, which disables heating. + post_process_kwargs : Optional[PostProcessKwargs] + Keyword arguments to control post-processing operations. log_kwargs : Optional[dict[str, Any]] Keyword arguments to pass to log config. Default is None. seed : Optional[int] @@ -157,6 +169,7 @@ def __init__( # pylint: disable=too-many-arguments,too-many-locals,too-many-sta temp_end: Optional[float] = None, temp_step: Optional[float] = None, temp_time: Optional[float] = None, + post_process_kwargs: Optional[PostProcessKwargs] = None, log_kwargs: Optional[dict[str, Any]] = None, seed: Optional[int] = None, ) -> None: @@ -231,6 +244,8 @@ def __init__( # pylint: disable=too-many-arguments,too-many-locals,too-many-sta disables heating. temp_time : Optional[float] Time between heating steps, in fs. Default is None, which disables heating. + post_process_kwargs : Optional[PostProcessKwargs] + Keyword arguments to control post-processing operations. log_kwargs : Optional[dict[str, Any]] Keyword arguments to pass to log config. Default is None. seed : Optional[int] @@ -262,6 +277,9 @@ def __init__( # pylint: disable=too-many-arguments,too-many-locals,too-many-sta self.temp_end = temp_end self.temp_step = temp_step self.temp_time = temp_time * units.fs if temp_time else None + self.post_process_kwargs = ( + post_process_kwargs if post_process_kwargs is not None else {} + ) self.log_kwargs = log_kwargs self.ensemble = ensemble self.seed = seed @@ -315,7 +333,7 @@ def __init__( # pylint: disable=too-many-arguments,too-many-locals,too-many-sta self.minimize_kwargs = minimize_kwargs if minimize_kwargs else {} self.restart_files = [] - self.dyn = None + self.dyn: Union[Langevin, VelocityVerlet, ASE_NPT] self.n_atoms = len(self.struct) self.stats_file = self._build_filename( @@ -542,6 +560,60 @@ def _write_final_state(self) -> None: columns=["symbols", "positions", "momenta", "masses"], ) + def _post_process(self) -> None: + """Compute properties after MD run.""" + # Nothing to do + if not any( + self.post_process_kwargs.get(kwarg, None) + for kwarg in ("rdf_compute", "vaf_compute") + ): + return + + data = read(self.traj_file, index=":") + + if ASE_GEOMETRY: + ana = Analysis(data) + else: + ana = None + + param_pref = self._parameter_prefix if self.file_prefix is None else "" + + if self.post_process_kwargs.get("rdf_compute", False): + base_name = self.post_process_kwargs.get("rdf_output_file", None) + rdf_args = { + name: self.post_process_kwargs.get(key, default) + for name, (key, default) in ( + ("rmax", ("rdf_rmax", 2.5)), + ("nbins", ("rdf_nbins", 50)), + ("elements", ("rdf_elements", None)), + ) + } + slice_ = ( + self.post_process_kwargs.get("rdf_start", 0), + self.post_process_kwargs.get("rdf_stop", 1), + self.post_process_kwargs.get("rdf_step", 1), + ) + + out_paths = [ + self._build_filename( + "rdf.dat", param_pref, str(ind), prefix_override=base_name + ) + for ind in range(*slice_) + ] + + rdf_args["index"] = slice_ + compute_rdf(data, ana, filename=out_paths, **rdf_args) + + if self.post_process_kwargs.get("vaf_compute", False): + + file_name = self.post_process_kwargs.get("vaf_output_file", None) + use_vel = self.post_process_kwargs.get("vaf_velocities", False) + fft = self.post_process_kwargs.get("vaf_fft", False) + + out_path = self._build_filename("vaf.dat", param_pref, filename=file_name) + + compute_vaf(data, out_path, use_velocities=use_vel, fft=fft) + def _write_restart(self) -> None: """Write restart file and (optionally) rotate files saved.""" step = self.offset + self.dyn.nsteps @@ -594,6 +666,9 @@ def run(self) -> None: self.struct.info["real_time"] = datetime.datetime.now() self._run_dynamics() + if self.post_process_kwargs: + self._post_process() + def _run_dynamics(self) -> None: """Run dynamics and/or temperature ramp.""" # Store temperature for final MD diff --git a/janus_core/cli/md.py b/janus_core/cli/md.py index a283462b..638f26c0 100644 --- a/janus_core/cli/md.py +++ b/janus_core/cli/md.py @@ -14,6 +14,7 @@ Device, LogPath, MinimizeKwargs, + PostProcessKwargs, ReadKwargs, StructPath, Summary, @@ -168,6 +169,7 @@ def md( temp_time: Annotated[ float, Option(help="Time between heating steps, in fs.") ] = None, + post_process_kwargs: PostProcessKwargs = None, log: LogPath = "md.log", seed: Annotated[ Optional[int], @@ -267,6 +269,8 @@ def md( temp_time : Optional[float] Time between heating steps, in fs. Default is None, which disables heating. + post_process_kwargs : Optional[PostProcessKwargs] + Kwargs to pass to post-processing. log : Optional[Path] Path to write logs to. Default is "md.log". seed : Optional[int] @@ -280,8 +284,10 @@ def md( # Check options from configuration file are all valid check_config(ctx) - [read_kwargs, calc_kwargs, minimize_kwargs] = parse_typer_dicts( - [read_kwargs, calc_kwargs, minimize_kwargs] + [read_kwargs, calc_kwargs, minimize_kwargs, post_process_kwargs] = ( + parse_typer_dicts( + [read_kwargs, calc_kwargs, minimize_kwargs, post_process_kwargs] + ) ) if not ensemble in get_args(Ensembles): @@ -334,6 +340,7 @@ def md( "temp_end": temp_end, "temp_step": temp_step, "temp_time": temp_time, + "post_process_kwargs": post_process_kwargs, "log_kwargs": log_kwargs, "seed": seed, } diff --git a/janus_core/cli/types.py b/janus_core/cli/types.py index 93f6baaa..b94b836a 100644 --- a/janus_core/cli/types.py +++ b/janus_core/cli/types.py @@ -138,6 +138,21 @@ def __str__(self): ), ] +PostProcessKwargs = Annotated[ + TyperDict, + Option( + parser=parse_dict_class, + help=( + """ + Keyword arguments to pass to post-processer. Must be passed as a dictionary + wrapped in quotes, e.g. "{'key' : value}". + """ + ), + metavar="DICT", + ), +] + + LogPath = Annotated[Path, Option(help="Path to save logs to.")] Summary = Annotated[ diff --git a/janus_core/helpers/janus_types.py b/janus_core/helpers/janus_types.py index 4ab971e0..b87c7c36 100644 --- a/janus_core/helpers/janus_types.py +++ b/janus_core/helpers/janus_types.py @@ -43,13 +43,32 @@ class ASEWriteArgs(TypedDict, total=False): class ASEOptArgs(TypedDict, total=False): - """Main arugments for ase optimisers.""" + """Main arguments for ase optimisers.""" restart: Optional[bool] logfile: Optional[PathLike] trajectory: Optional[str] +class PostProcessKwargs(TypedDict, total=False): + """Main arguments for MD post-processing.""" + + # RDF + rdf_compute: bool + rdf_rmax: float + rdf_nbins: int + rdf_elements: MaybeSequence[Union[str, int]] + rdf_start: int + rdf_stop: Optional[int] + rdf_step: int + rdf_output_file: Optional[str] + # VAF + vaf_compute: bool + vaf_velocities: bool + vaf_fft: bool + vaf_output_file: Optional[PathLike] + + # eos_names from ase.eos EoSNames = Literal[ "sj", diff --git a/janus_core/helpers/post_process.py b/janus_core/helpers/post_process.py new file mode 100644 index 00000000..1df85111 --- /dev/null +++ b/janus_core/helpers/post_process.py @@ -0,0 +1,211 @@ +"""Module for post-processing trajectories.""" + +from collections.abc import Sequence +from itertools import combinations +from typing import Any, Optional, Union +from warnings import warn + +from ase import Atoms +from ase.ga.utilities import get_rdf +import numpy as np +from numpy import float64 +from numpy.typing import NDArray + +from janus_core.helpers.janus_types import MaybeSequence, PathLike + + +def compute_rdf( + data: MaybeSequence[Atoms], + ana: Optional["ase.geometry.analysis.Analysis"] = None, + /, + *, + filename: Optional[MaybeSequence[PathLike]] = None, + by_elements: bool = False, + rmax: float = 2.5, + nbins: int = 50, + elements: MaybeSequence[Union[int, str]] = None, + index: tuple[int, Optional[int], int] = (0, None, 1), +) -> Union[NDArray[float64], dict[tuple[str, str], NDArray[float64]]]: + """ + Compute the rdf of data. + + Parameters + ---------- + data : MaybeSequence[Atoms] + Dataset to compute RDF of. + ana : Optional[ase.geometry.analysis.Analysis] + ASE Analysis object for data reuse. + filename : Optional[MaybeSequence[PathLike]] + Filename(s) to output data to. Must match number of RDFs computed. + by_elements : bool + Split RDF into pairwise by elements group. + rmax : float + Maximum distance of RDF. + nbins : int + Number of bins to divide RDF. + elements : MaybeSequence[Union[int, str]] + Make partial RDFs. If `by_elements` is true will filter to + only display pairs in list. + index : tuple[int, Optional[int], int] + Images to analyze as `start`, `stop`, `step`. + + Returns + ------- + Union[NDArray[float64], dict[tuple[str, str], NDArray[float64]]] + If `by_elements` is true returns a `dict` of RDF by element pairs. + Otherwise returns RDF of total system filtered by elements. + """ + + if ana is not None: + + def calc_rdf( + rmax: float, + nbins: int, + elements: MaybeSequence[Union[str, int]], + index: tuple[int, Optional[int], int], + ) -> None: + """ + Wrapper to compute RDF. + + Parameters + ---------- + rmax : float + Maximum distance of RDF. + nbins : int + Number of bins to divide RDF. + elements : MaybeSequence[Union[int, str]] + Make partial RDFs. + index : tuple[int, Optional[int], int] + Images to analyze as `start`, `stop`, `step`. + """ + return ana.get_rdf( + rmax=rmax, + nbins=nbins, + elements=elements, + imageIdx=slice(*index), + return_dists=True, + ) + + else: + if index != (0, None, 1): + warn( + "Not using ase.geometry.analysis.Analysis object " + "`index` will be ignored" + ) + + def calc_rdf( + rmax: float, nbins: int, elements: MaybeSequence[Union[str, int]], _: Any + ): + """ + Wrapper to compute RDF. + + Parameters + ---------- + rmax : float + Maximum distance of RDF. + nbins : int + Number of bins to divide RDF. + elements : MaybeSequence[Union[int, str]] + Make partial RDFs. If `by_elements` is true will filter to + only display pairs in list. + _ : Any + Dummy to preserve interface with modern Analysis mode. + """ + return get_rdf(data, rmax=rmax, nbins=nbins, elements=elements) + + def dump_rdf(full_rdf: NDArray[float64], filename: PathLike) -> None: + """ + Write RDF to file. + + Parameters + ---------- + full_rdf : NDArray[float64] + RDF to dump to file. + filename : PathLike + File to dump to. + """ + for (rdfs, dists), out_path in zip(full_rdf, filename): + with open(out_path, "w", encoding="utf-8") as out_file: + for rdf, dist in zip(rdfs, dists): + print(dist, rdf, file=out_file) + + if not isinstance(data, Sequence): + data = (data,) + + if by_elements: + elements = set(data[0].get_chemical_symbols()) if elements is None else elements + + rdf = { + element: calc_rdf(rmax, nbins, element, index) + for element in combinations(elements, 2) + } + else: + rdf = calc_rdf(rmax, nbins, elements, index) + + if filename is not None: + if not isinstance(filename, Sequence): + filename = (filename,) + + if by_elements: + for full_rdf in rdf.values(): + dump_rdf(full_rdf, filename) + else: + dump_rdf(rdf, filename) + + return rdf + + +def compute_vaf( + data: Sequence[Atoms], + filename: Optional[PathLike] = None, + *, + use_velocities: bool = False, + fft: bool = False, +) -> NDArray[float64]: + """ + Compute the velocity autocorrelation function (VAF) of `data`. + + Parameters + ---------- + data : Sequence[Atoms] + Dataset to compute VAF of. + filename : Optional[PathLike] + If present, dump resultant VAF to file. + use_velocities : bool + Compute VAF using velocities rather than momenta. + fft : bool + Compute the fourier transformed VAF. + + Returns + ------- + NDArray[float64] + Computed VAF. + """ + if use_velocities: + momenta = np.asarray([datum.get_momenta() / datum.get_masses() for datum in data]) + else: + momenta = np.asarray([datum.get_momenta() for datum in data]) + + n_steps = len(momenta) + n_atoms = len(momenta[0]) + + vaf = np.sum( + np.asarray( + [[np.correlate(momenta[:, j, i], momenta[:, j, i], "same") + for i in range(3)] + for j in range(n_atoms)] + ), + axis=1, + ) + + vaf /= n_steps - np.arange(n_steps) + + if fft: + vaf = np.fft.fft(vaf) + + if filename is not None: + with open(filename, "w", encoding="utf-8") as out_file: + for elem in vaf: + print(elem, file=out_file) + + return vaf diff --git a/tests/test_md.py b/tests/test_md.py index e7ab46a4..0f632c82 100644 --- a/tests/test_md.py +++ b/tests/test_md.py @@ -39,19 +39,22 @@ def test_init(ensemble, expected): assert dyn.ensemble == expected -def test_npt(): +def test_npt(): # pylint: disable=too-many-locals """Test NPT molecular dynamics.""" restart_path_1 = Path("Cl4Na4-npt-T300.0-p1.0-res-2.xyz") restart_path_2 = Path("Cl4Na4-npt-T300.0-p1.0-res-4.xyz") restart_final = Path("Cl4Na4-npt-T300.0-p1.0-final.xyz") traj_path = Path("Cl4Na4-npt-T300.0-p1.0-traj.xyz") stats_path = Path("Cl4Na4-npt-T300.0-p1.0-stats.dat") + rdf_path = Path("Cl4Na4-npt-0-rdf.dat") + vaf_path = Path("Cl4Na4-npt-vaf.dat") assert not restart_path_1.exists() assert not restart_path_2.exists() assert not restart_final.exists() assert not traj_path.exists() assert not stats_path.exists() + assert not rdf_path.exists() single_point = SinglePoint( struct_path=DATA_PATH / "NaCl.cif", @@ -66,11 +69,15 @@ def test_npt(): traj_every=1, restart_every=2, stats_every=1, + post_process_kwargs=dict( + rdf_compute=True, + rdf_rmax=2.5, + vaf_compute=True, + ), ) try: npt.run() - restart_atoms_1 = read(restart_path_1) assert isinstance(restart_atoms_1, Atoms) restart_atoms_2 = read(restart_path_2) @@ -81,6 +88,14 @@ def test_npt(): assert all(isinstance(image, Atoms) for image in traj) assert len(traj) == 4 + assert rdf_path.exists() + rdf = np.loadtxt(rdf_path) + assert len(rdf) == 50 + # Cell too small to really compute RDF + assert np.all(rdf[:, 1] == 0) + + assert vaf_path.exists() + with open(stats_path, encoding="utf8") as stats_file: lines = stats_file.readlines() assert "Target P [bar] | Target T [K]" in lines[0] @@ -91,6 +106,8 @@ def test_npt(): restart_final.unlink(missing_ok=True) traj_path.unlink(missing_ok=True) stats_path.unlink(missing_ok=True) + rdf_path.unlink(missing_ok=True) + vaf_path.unlink(missing_ok=True) def test_nvt_nh():