Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelizes MDAnalysis.analysis.InterRDF and MDAnalysis.analysis.InterRDF_s #4884

Open
wants to merge 18 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion package/CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,13 @@ Enhancements
(Issue #4677, PR #4729)
* Enables parallelization for analysis.contacts.Contacts (Issue #4660)
* Enable parallelization for analysis.nucleicacids.NucPairDist (Issue #4670)
* Add check and warning for empty (all zero) coordinates in RDKit converter (PR #4824)
* Added `precision` for XYZWriter (Issue #4775, PR #4771)
* Parallelize `analysis.rdf.InterRDF` and `analysis.rdf.InterRDF` (Issue #4675)
* Added `precision` for XYZWriter (Issue #4775, PR #4771)
* Add check and warning for empty (all zero) coordinates in RDKit converter
(PR #4824)
* Added `precision` for XYZWriter (Issue #4775, PR #4771)
* Added `precision` for XYZWriter (Issue #4775, PR #4771)

Changes
* MDAnalysis.analysis.psa, MDAnalysis.analysis.waterdynamics and
Expand Down
95 changes: 90 additions & 5 deletions package/MDAnalysis/analysis/rdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,40 @@
import numpy as np

from ..lib import distances
from .base import AnalysisBase
from .base import AnalysisBase, ResultsGroup


def flatten_sum_alternate(arrs):
r"""Custom aggregator for nested arrays

This function takes a nested list or tuple of NumPy arrays, flattens it into a single list,
and aggregates the elements at alternating indices into two separate arrays. The first
array accumulates elements at even indices, while the second accumulates elements at odd indices.

Parameters
----------
arrs : list
List of arrays or nested lists of arrays

Returns
-------
list of ndarray
A list containing two NumPy arrays:
- The first array is the sum of all elements at even indices in the sum of flattened arrays.
- The second array is the sum of all elements at odd indices in the sum of flattened arrays.
"""

def flatten(arr):
if isinstance(arr, (list, tuple)):
return [item for sublist in arr for item in flatten(sublist)]
return [arr]

flat = flatten(arrs)
aggregated_arr = [np.zeros_like(flat[0]), np.zeros_like(flat[1])]
for i in range(len(flat) // 2):
aggregated_arr[0] += flat[2 * i] # 0, 2, 4, ...
aggregated_arr[1] += flat[2 * i + 1] # 1, 3, 5, ...
return aggregated_arr


class InterRDF(AnalysisBase):
Expand Down Expand Up @@ -216,8 +249,23 @@ class InterRDF(AnalysisBase):
Store results as attributes `bins`, `edges`, `rdf` and `count`
of the `results` attribute of
:class:`~MDAnalysis.analysis.AnalysisBase`.

.. versionchanged:: 2.9.0
Enabled **parallel execution** with the ``multiprocessing`` and ``dask``
backends; use the new method :meth:`get_supported_backends` to see all
supported backends.
"""

@classmethod
def get_supported_backends(cls):
return (
"serial",
"multiprocessing",
"dask",
)

_analysis_algorithm_is_parallelizable = True

def __init__(
self,
g1,
Expand Down Expand Up @@ -273,7 +321,7 @@ def _prepare(self):

if self.norm == "rdf":
# Cumulative volume for rdf normalization
self.volume_cum = 0
self.results.volume_cum = 0
# Set the max range to filter the search radius
self._maxrange = self.rdf_settings["range"][1]

Expand Down Expand Up @@ -302,7 +350,17 @@ def _single_frame(self):
self.results.count += count

if self.norm == "rdf":
self.volume_cum += self._ts.volume
self.results.volume_cum += self._ts.volume

def _get_aggregator(self):
return ResultsGroup(
lookup={
"count": ResultsGroup.ndarray_sum,
"volume_cum": ResultsGroup.ndarray_sum,
"bins": ResultsGroup.ndarray_sum,
"edges": ResultsGroup.ndarray_mean,
}
)

def _conclude(self):
norm = self.n_frames
Expand All @@ -324,6 +382,7 @@ def _conclude(self):
N -= xA * xB * nblocks

# Average number density
self.volume_cum = self.results.volume_cum
box_vol = self.volume_cum / self.n_frames
norm *= N / box_vol

Expand Down Expand Up @@ -561,8 +620,33 @@ class InterRDF_s(AnalysisBase):
Instead of `density=True` use `norm='density'`
.. deprecated:: 2.3.0
The `universe` parameter is superflous.

.. versionchanged:: 2.9.0
Enabled **parallel execution** with the ``multiprocessing`` and ``dask``
backends; use the new method :meth:`get_supported_backends` to see all
supported backends.
"""

@classmethod
def get_supported_backends(cls):
return (
"serial",
"multiprocessing",
"dask",
)

_analysis_algorithm_is_parallelizable = True

def _get_aggregator(self):
return ResultsGroup(
lookup={
"count": flatten_sum_alternate,
"volume_cum": ResultsGroup.ndarray_sum,
"bins": ResultsGroup.ndarray_mean,
"edges": ResultsGroup.ndarray_mean,
}
)

def __init__(
self,
u,
Expand Down Expand Up @@ -614,7 +698,7 @@ def _prepare(self):

if self.norm == "rdf":
# Cumulative volume for rdf normalization
self.volume_cum = 0
self.results.volume_cum = 0
self._maxrange = self.rdf_settings["range"][1]

def _single_frame(self):
Expand All @@ -631,7 +715,7 @@ def _single_frame(self):
self.results.count[i][idx1, idx2, :] += count

if self.norm == "rdf":
self.volume_cum += self._ts.volume
self.results.volume_cum += self._ts.volume

def _conclude(self):
norm = self.n_frames
Expand All @@ -642,6 +726,7 @@ def _conclude(self):

if self.norm == "rdf":
# Average number density
self.volume_cum = self.results.volume_cum
norm *= 1 / (self.volume_cum / self.n_frames)

# Empty lists to restore indices, RDF
Expand Down
11 changes: 11 additions & 0 deletions testsuite/MDAnalysisTests/analysis/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from MDAnalysis.analysis.nucleicacids import NucPairDist
from MDAnalysis.analysis.contacts import Contacts
from MDAnalysis.analysis.density import DensityAnalysis
from MDAnalysis.analysis.rdf import InterRDF, InterRDF_s
from MDAnalysis.lib.util import is_installed


Expand Down Expand Up @@ -176,3 +177,13 @@ def client_Contacts(request):
@pytest.fixture(scope="module", params=params_for_cls(DensityAnalysis))
def client_DensityAnalysis(request):
return request.param


@pytest.fixture(scope="module", params=params_for_cls(InterRDF))
def client_InterRDF(request):
return request.param


@pytest.fixture(scope="module", params=params_for_cls(InterRDF_s))
def client_InterRDF_s(request):
return request.param
72 changes: 48 additions & 24 deletions testsuite/MDAnalysisTests/analysis/test_rdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,83 +48,85 @@ def sels(u):
return s1, s2


def test_nbins(u):
def test_nbins(u, client_InterRDF):
s1 = u.atoms[:3]
s2 = u.atoms[3:]
rdf = InterRDF(s1, s2, nbins=412).run()
rdf = InterRDF(s1, s2, nbins=412).run(**client_InterRDF)

assert len(rdf.results.bins) == 412


def test_range(u):
def test_range(u, client_InterRDF):
s1 = u.atoms[:3]
s2 = u.atoms[3:]
rmin, rmax = 1.0, 13.0
rdf = InterRDF(s1, s2, range=(rmin, rmax)).run()
rdf = InterRDF(s1, s2, range=(rmin, rmax)).run(**client_InterRDF)

assert rdf.results.edges[0] == rmin
assert rdf.results.edges[-1] == rmax


def test_count_sum(sels):
def test_count_sum(sels, client_InterRDF):
# OW vs HW
# should see 8 comparisons in count
s1, s2 = sels
rdf = InterRDF(s1, s2).run()
rdf = InterRDF(s1, s2).run(**client_InterRDF)
assert rdf.results.count.sum() == 8


def test_count(sels):
def test_count(sels, client_InterRDF):
# should see two distances with 4 counts each
s1, s2 = sels
rdf = InterRDF(s1, s2).run()
rdf = InterRDF(s1, s2).run(**client_InterRDF)
assert len(rdf.results.count[rdf.results.count == 4]) == 2


def test_double_run(sels):
def test_double_run(sels, client_InterRDF):
# running rdf twice should give the same result
s1, s2 = sels
rdf = InterRDF(s1, s2).run()
rdf.run()
rdf = InterRDF(s1, s2).run(**client_InterRDF)
rdf.run(**client_InterRDF)
assert len(rdf.results.count[rdf.results.count == 4]) == 2


def test_exclusion(sels):
def test_exclusion(sels, client_InterRDF):
# should see two distances with 4 counts each
s1, s2 = sels
rdf = InterRDF(s1, s2, exclusion_block=(1, 2)).run()
rdf = InterRDF(s1, s2, exclusion_block=(1, 2)).run(**client_InterRDF)
assert rdf.results.count.sum() == 4


@pytest.mark.parametrize(
"attr, count", [("residue", 8), ("segment", 0), ("chain", 8)]
)
def test_ignore_same_residues(sels, attr, count):
def test_ignore_same_residues(sels, attr, count, client_InterRDF):
# should see two distances with 4 counts each
s1, s2 = sels
rdf = InterRDF(s2, s2, exclude_same=attr).run()
rdf = InterRDF(s2, s2, exclude_same=attr).run(**client_InterRDF)
assert rdf.rdf[0] == 0
assert rdf.results.count.sum() == count


def test_ignore_same_residues_fails(sels):
def test_ignore_same_residues_fails(sels, client_InterRDF):
s1, s2 = sels
with pytest.raises(
ValueError, match="The exclude_same argument to InterRDF must be"
):
InterRDF(s2, s2, exclude_same="unsupported").run()
InterRDF(s2, s2, exclude_same="unsupported").run(**client_InterRDF)

with pytest.raises(
ValueError,
match="The exclude_same argument to InterRDF cannot be used with",
):
InterRDF(s2, s2, exclude_same="residue", exclusion_block=tuple()).run()
InterRDF(s2, s2, exclude_same="residue", exclusion_block=tuple()).run(
**client_InterRDF
)


@pytest.mark.parametrize("attr", ("rdf", "bins", "edges", "count"))
def test_rdf_attr_warning(sels, attr):
def test_rdf_attr_warning(sels, attr, client_InterRDF):
s1, s2 = sels
rdf = InterRDF(s1, s2).run()
rdf = InterRDF(s1, s2).run(**client_InterRDF)
wmsg = f"The `{attr}` attribute was deprecated in MDAnalysis 2.0.0"
with pytest.warns(DeprecationWarning, match=wmsg):
getattr(rdf, attr) is rdf.results[attr]
Expand All @@ -133,22 +135,44 @@ def test_rdf_attr_warning(sels, attr):
@pytest.mark.parametrize(
"norm, value", [("density", 1.956823), ("rdf", 244602.88385), ("none", 4)]
)
def test_norm(sels, norm, value):
def test_norm(sels, norm, value, client_InterRDF):
s1, s2 = sels
rdf = InterRDF(s1, s2, norm=norm).run()
rdf = InterRDF(s1, s2, norm=norm).run(**client_InterRDF)
assert_allclose(max(rdf.results.rdf), value)


@pytest.mark.parametrize(
"norm, norm_required", [("Density", "density"), (None, "none")]
)
def test_norm_values(sels, norm, norm_required):
def test_norm_values(sels, norm, norm_required, client_InterRDF):
s1, s2 = sels
rdf = InterRDF(s1, s2, norm=norm).run()
rdf = InterRDF(s1, s2, norm=norm).run(**client_InterRDF)
assert rdf.norm == norm_required


def test_unknown_norm(sels):
s1, s2 = sels
with pytest.raises(ValueError, match="invalid norm"):
InterRDF(s1, s2, sels, norm="foo")

# tests for parallelization

@pytest.mark.parametrize(
"classname,is_parallelizable",
[
(mda.analysis.rdf.InterRDF, True),
]
)
def test_class_is_parallelizable(classname, is_parallelizable):
assert classname._analysis_algorithm_is_parallelizable == is_parallelizable


@pytest.mark.parametrize(
"classname,backends",
[
(mda.analysis.rdf.InterRDF,
('serial', 'multiprocessing', 'dask',)),
]
)
def test_supported_backends(classname, backends):
assert classname.get_supported_backends() == backends
Loading
Loading