Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GSoC] Parallelisation of AnalysisBase with multiprocessing and dask #4162

Merged
merged 288 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
288 commits
Select commit Hold shift + click to select a range
c58da9f
Remove _scheduler attribute and make dask-based tests run properly
Jun 15, 2023
1061a80
Refactor scheduler usage
Jun 20, 2023
2d7a56c
Add multiple workers in dask for testing
Jun 20, 2023
3d1cf80
Refactor _setup_bslices and add processes to dask scheduler kwargs
Jun 21, 2023
bb31a32
Create frame_indices and trajectory for each bslice during _setup_bsl…
Jun 22, 2023
eb1feb1
Use explicit initialisation of timeseries wiith zeros
Jun 22, 2023
4e11bcf
Add non-trivial _parallel_conclude function
Jun 22, 2023
bc295c8
Fix tests for new dask fixture
Jun 22, 2023
75d1707
Add type-matching _parallel_conclude
Jun 23, 2023
c5bd0b5
Add fixtures to test combinations of dask and multiprocessing
Jun 25, 2023
556c309
dask and multiprocessing works in test_atomicdistances.py
Jun 25, 2023
83630b4
Fix bug in results is np.ndarray codepath
Jun 25, 2023
e41cba3
Add _setup_scheduler raising NotImplemented error in align.py::Avera…
Jun 25, 2023
1659323
dask and multiprocessing schedulers to test_align.py
Jun 25, 2023
f9a8072
dask scheduler for test_contacts.py and test for incompatibility with…
Jun 25, 2023
f975ecf
dask and multiprocessing scheduler for test_density.py
Jun 25, 2023
cf0253d
Add _parallel_conclude implementation for dielectric
Jun 25, 2023
8b0f4fb
dask and multiprocessing schedulers for test_dielectric.py
Jun 25, 2023
cc25c65
dask and multiprocessing schedulers for test_diffusionmap.py
Jun 25, 2023
4aa005b
Add NotImplementedError for parallel schedulers in dihedrals.py
Jun 25, 2023
4c1a73a
only current scheduler for test_dihedrals.py
Jun 25, 2023
b76d6ff
dask and multiprocessing tests for test_encore.py -- but some fail be…
Jun 25, 2023
502fb84
Add NotImplementedError for _setup_scheduler in gnm.py
Jun 25, 2023
49d1742
Add NotImplementedError for _setup_scheduler in helix_analysis.py
Jun 25, 2023
90a318d
current process scheduler for test_helix_analysis.py
Jun 25, 2023
3cc1dff
dask and multiprocessing schedulers for test_hole2.py
Jun 25, 2023
33e8b4c
Add NotImplementedError in for not-None schedulers
Jun 25, 2023
1058ed3
current process scheduler and test for failing non-current ones in te…
Jun 25, 2023
b49a8ba
current process only scheduler and failing test for others in test_li…
Jun 25, 2023
4e08384
Add NotImplementedError for non-current process schedulers
Jun 25, 2023
274a6ff
current process scheduler only and failing tests for non-current ones…
Jun 25, 2023
664e378
Add NotImplementedError for non-current process schedulers
Jun 25, 2023
e8eeba7
Fix scope of fixtures
Jun 25, 2023
42e34dc
Add NotImplemented error for all non-current process schedulers
Jun 25, 2023
38fe4ea
only current process scheduler and failing tests for test_nucleicacid…
Jun 25, 2023
b9b651d
dask and multiprocessing schedulers for test_persistentlength.py
Jun 25, 2023
0c151a1
Add _parallel_conclude implementation
Jun 25, 2023
33af089
dask and multiprocessing schedulers for test_psa.py
Jun 25, 2023
a79229d
Add _parallel_conclude implementation for RDF and RDF_S
Jun 25, 2023
a788358
dask and multiprocessing schedulers for test_rdf_s.py
Jun 25, 2023
6fe92c2
dask and multiprocessing schedulers for test_rdf.py
Jun 25, 2023
4c0937e
Add NotImplementedError for RMSD and RMSF classes
Jun 25, 2023
c40e9e1
only local process scheduler and failing tests for others for test_rm…
Jun 25, 2023
68795b8
current process scheduler only and failing test for others for test_w…
Jun 25, 2023
419a97a
Add NotImplementedError in _setup_scheduler
Jun 25, 2023
bab8e81
Add more clear message during exception
Jun 25, 2023
641170f
Add timeseries aggregation function
Jun 29, 2023
f31792a
dask and multiprocessing scheduler for most of the test_base.py testc…
Jun 29, 2023
4a10112
dask and multiprocessing schedulers for test_rms.py::TestRMSD
Jun 29, 2023
c8187ae
Add NotImplementedError for pca and rms
Jun 29, 2023
ac035eb
dask and multiprocessing schedulers for test_bat
Jun 29, 2023
076b4ce
dcurrent process scheduler for test_gnm.py
Jun 29, 2023
0dffdc8
dcurrent process scheduler for test_pca.py
Jun 29, 2023
d9429a8
Fix rmsf-related scheduler usage to only current process scheduler
Jun 29, 2023
767388f
remove fixme marks
Jun 29, 2023
1780468
Switch to enumerate in _compute main loop and fix code review comments
Jun 30, 2023
00593c0
Add dask to CI setup actions
Jul 2, 2023
6603173
Remove local scheduler for progressbar test
Jul 2, 2023
fd33788
Add installation with dask as asetup option
Jul 2, 2023
2a3b2f2
fix hole2 tests for -- implement only current scheduler and add faili…
Jul 2, 2023
268eada
fix progressbar test by changing order of ProgressBar and enumerate
Jul 3, 2023
4a019af
use only frame indices and frames in _setup_bslices after writing a b…
Jul 4, 2023
4877125
Refactor _setup_bslices: move enumerate to numpy and fuse logic in de…
Jul 4, 2023
9d9d918
Add documentation to AnalysisBase._parallel_conclude()
Jul 4, 2023
81a8df4
add functional-like interface draft
Jul 20, 2023
1430a84
Implement proper Client class, separating computations from AnalysisBase
Jul 23, 2023
48f094c
FINALLY implement working one-time dask cluster setup in kwargs of a …
Jul 23, 2023
110b589
Correct tests accordingly
Jul 23, 2023
d9d63d1
Separately process case of only one remote worker
Jul 24, 2023
60e4ea4
Add available_schedulers to AverageStructure
Jul 24, 2023
a91dacc
Use automatic fixture for AverageStructure
Jul 24, 2023
247d870
Add fixture for AverageStructure
Jul 24, 2023
df76f91
Add fixture for AtomicDistances
Jul 24, 2023
17c29d7
Change default available_backends to all implemented in Client
Jul 24, 2023
fc6d44d
Limit available backends for AverageStructure
Jul 24, 2023
fc74f5c
Add fixture for BAT
Jul 24, 2023
284d7c0
Add fixture tests to Contacts
Jul 24, 2023
ee91c1b
Fix n_workers check and boolean frames handling
Jul 24, 2023
b2fdd41
Fix performance of backend="dask"
Jul 27, 2023
eab7136
Add available_backends for Contacts
Jul 27, 2023
180569a
Remove _setup_scheduler
Jul 27, 2023
ba2246a
Use client fixture for Contacts
Jul 27, 2023
5c08885
Use client fixture for RMSD/RMSF
Jul 27, 2023
5c7b750
Revert files to their state in develop
Jul 27, 2023
49c9dcb
Delete files_for_undoing_changes.txt
Jul 27, 2023
62760c9
Delete conftest.py
Jul 27, 2023
90e99a5
Delete parallel_analysis_demo.ipynb
Jul 27, 2023
39f324a
Clean up notebook
Jul 27, 2023
7d7b5e5
remove notebook
Jul 27, 2023
787309b
Limit available schedulers in RMSF
Jul 27, 2023
77cff6d
Split test in two due to failing with "expectation" parametrization
Jul 27, 2023
2c583c4
Add fixture generator and fixtures for test_base and test_rms
Jul 27, 2023
76c59d3
Add dask to pyproject.toml
Jul 27, 2023
ba30774
Return computation groups explicitly
Jul 27, 2023
ac0b4a3
Merge branch 'develop' of https://github.com/MDAnalysis/mdanalysis in…
Aug 1, 2023
6e76520
Fix dask position in setup-deps/action.yaml
Aug 2, 2023
a5d24a3
Add dask[distributed] to mdanalysis[parallel] installation
Aug 2, 2023
2417d87
Undo autoformatter
Aug 2, 2023
1af07d3
Manually define available_backends for RMSD class
Aug 6, 2023
38a81db
Create separate "parallel" entry
Aug 6, 2023
d956159
Add is_installed function to utils
Aug 6, 2023
9e5e5ad
Add dict-based validatdion and computation logic for ParallelExecutor
Aug 6, 2023
fed5d9d
Add tests for ParallelExecutor
Aug 6, 2023
966ceca
Add documentation for "apply" method of ParallelExecutor
Aug 6, 2023
dd1fe28
Correct dask.distributed name
Aug 6, 2023
5643de1
Use chunksize=1 instead of explicit Pool in _compute_with_dask
Aug 6, 2023
14c5c53
Remove unnecessary function in conftest
Aug 6, 2023
477f08d
Fix function to retrieve dask client if dask is not installed
Aug 6, 2023
8c6738c
Fix base tests when dask is not installed
Aug 6, 2023
144b909
Use new LocalCluster every time
Aug 8, 2023
3224f28
Fix client/backend logic
Aug 8, 2023
f1da39e
Add documentation to a silly square function
Aug 8, 2023
a0ed309
Switch to package-wise autouse fixture for dask.distributed.Client
Aug 8, 2023
e8625c0
Add explicit result() when computing with cluster
Aug 8, 2023
0ba8407
Fix codereview
Aug 17, 2023
6206760
Replace list with tuple in available_backends for RMSD
Aug 17, 2023
2e101cd
Remove unnecessary get_running_dask_client
Aug 17, 2023
b6cb101
Implement fixture injection for subclasses testing
Aug 17, 2023
1d547f7
Add warnings filters
Aug 17, 2023
34ec5fc
Fix backend check when client is present
Aug 17, 2023
0ecec69
Return get_runnning_dask_client function
Aug 17, 2023
81b7b71
Change dask fixture scope
Aug 18, 2023
540cd26
Close LocalCluster to avoid trillions of logs
Aug 18, 2023
5a39a1a
Implement ResultsGroup based aggregation instead of type matching
Aug 18, 2023
7695dde
Add non-default _get_aggregator() to RMS and Base classes
Aug 18, 2023
074f1b2
Mark test_multiprocessing.py::test_creating_multiple_universe_without…
Aug 19, 2023
94cbefd
Restore failing test
Aug 19, 2023
7f53672
Make aggregation functions static methods of ResultsGroup
Aug 21, 2023
0ca5c5f
Merge branch 'feature/dask-0' of github.com:marinegor/mdanalysis into…
Aug 21, 2023
72ece49
Remove test skip
Aug 21, 2023
ef95b04
Move parallel part into a separate file
Aug 24, 2023
6d37652
Fix imports
Aug 31, 2023
6c26771
Proof of concept for duck-typed backends
Sep 5, 2023
5cd0ab0
Remove unused code
Sep 5, 2023
165174c
Replace ParallelExecutor with multiple backend classes and add duck-t…
Sep 12, 2023
0a72d04
Add all tests for analysis/parallel.py and fix bug in ResultsGroup.nd…
Sep 12, 2023
a168266
Change typing to py3.9 compatible syntax
Sep 13, 2023
5683245
Add _is_parallelizable to AnalysisFromFunction
Sep 13, 2023
28b67f8
Remove dask[distributed] even as an optional dependency
Sep 13, 2023
44a4600
Update documentation
Sep 13, 2023
bf3fb06
Remove function to get running dask client
Sep 13, 2023
ced8a04
Remove unused code from analysis/conftest.py
Sep 13, 2023
a399f67
Fix documentation and minor issues from codereview
Sep 13, 2023
eafa51d
Update package/MDAnalysis/analysis/rms.py
Sep 13, 2023
4ba84a8
Merge branch 'feature/dask-0' of github.com:marinegor/mdanalysis into…
Sep 13, 2023
a936b4a
Add more backend validation tests and fix autoformatter issues
Sep 13, 2023
9641dc5
Start implementing correct result sizes in separate computation groups
Sep 15, 2023
9dda941
Continue working: diffusionmap and PCA tests fail
Sep 15, 2023
2d9ca29
Merge remote-tracking branch 'upstream/develop' into feature/appropri…
marinegor Jan 10, 2024
211cbcf
Fix bug in PCA trajectory iteration -- avoid explicit usage of self.s…
marinegor Jan 12, 2024
d4de910
update changelog and tests for PCA fix
marinegor Jan 12, 2024
a58e8e1
Merge branch 'bugfix/pca-frames-iteration' into feature/appropriate-s…
marinegor Jan 12, 2024
856f65c
Fix diffusionmap and pca
marinegor Jan 12, 2024
1058e00
Make sure not to reset self.{start,stop,step} during self._compute
marinegor Jan 12, 2024
24a11c3
Change iteration pattern to sliced trajectory
marinegor Jan 12, 2024
22987c7
Change iteration pattern to sliced trajectory
marinegor Jan 12, 2024
cc033a1
Update package/MDAnalysis/analysis/parallel.py
Jan 12, 2024
eb43f4f
Apply suggestions from code review
Jan 12, 2024
db3d8bc
Split _setup_frames into two separate functions
marinegor Jan 12, 2024
1107ccc
Merge branch 'feature/appropriate-sized-results-in-parallel' into fea…
marinegor Jan 12, 2024
bb53e3f
Merge branch 'feature/analysisbase-code-deduplication' into feature/p…
marinegor Jan 12, 2024
44bcd89
Add docstrings for _prepare_sliced_trajectory and _define_run_frames
marinegor Jan 12, 2024
0be8ab9
Remove dask-distributed from dependencies
marinegor Jan 12, 2024
25ef3c7
Test only 2 processors with parallelizable backends
marinegor Jan 12, 2024
633505f
Rename available_backends and safe
marinegor Jan 12, 2024
92bad39
Apply codereview changes
marinegor Jan 12, 2024
6bb1779
Make tests for AnalysisBase subclasses explicit
marinegor Jan 13, 2024
bf96b67
Exclude "multiprocessing" from analysis_class function available back…
marinegor Jan 13, 2024
102e91a
Split parallel.py into results.py and parallel.py
marinegor Jan 13, 2024
83a552c
Finalize separation of results and backends
marinegor Jan 13, 2024
6a685fd
Rename parallel.py to backends.py
marinegor Jan 14, 2024
e8b080d
Add results and backends to analysis/__init__.py
marinegor Jan 14, 2024
514688b
Fix pep8 errors in docstrings and code
marinegor Jan 14, 2024
ebcd4ca
Add versionadded to documentation
marinegor Jan 14, 2024
fcdf330
Merge branch 'feature/pep8-parallelization' into feature/parallelization
marinegor Jan 17, 2024
f529a41
Update sphinx documentation with backends and results
marinegor Jan 17, 2024
bf07a0f
Add parallelization reference to base.py
marinegor Jan 17, 2024
3408a54
Switch to relative imports
marinegor Jan 17, 2024
39c9560
Update documentation, adding introduced changes
marinegor Jan 17, 2024
53e00e8
Update documentation adding parallelization support for rms
marinegor Jan 17, 2024
2fb504b
Add module documentation to results and backends
marinegor Jan 17, 2024
0f240a5
Fix merge conflicts
marinegor Jan 21, 2024
abe711d
Fix BackendSerial validation and add its tests
marinegor Jan 22, 2024
8a59a75
Fix calling of self._is_paralellizable()
marinegor Jan 22, 2024
b87152e
Add tests on is_parallelizable and get_supported_backends
marinegor Jan 22, 2024
ff508de
Fix bug with default progressbar_kwargs being dict
marinegor Jan 24, 2024
d045225
Merge branch 'develop' into feature/dask-0
RMeli Jan 30, 2024
b9e8e53
Apply suggestions from code review
Jan 31, 2024
a53df61
Add docstrings to apply() in backends
marinegor Feb 7, 2024
4ea030c
Add double n_worker check
marinegor Feb 7, 2024
656e461
Apply suggestions from code review
Feb 7, 2024
552aab4
Merge with develop
marinegor Feb 7, 2024
ea7b0c9
Merge remote-tracking branch 'origin/feature/dask-0' into feature/par…
marinegor Feb 7, 2024
b7de11c
Fix hasattr in double n_worker check
marinegor Feb 7, 2024
536a197
Revert test `with expectation` in test_align
marinegor Feb 7, 2024
04705d7
Merge remote-tracking branch 'origin/feature/dask-1' into feature/par…
marinegor Feb 17, 2024
a71c809
Merge remote-tracking branch 'upstream/develop' into feature/parallel…
marinegor Feb 21, 2024
c94b5bf
Update testsuite/MDAnalysisTests/analysis/test_pca.py
Feb 21, 2024
a67f5d3
Update package/MDAnalysis/lib/util.py
Feb 21, 2024
12b255b
Update changelog
marinegor Feb 21, 2024
eaa4c30
Merge remote-tracking branch 'origin/feature/dask-0' into feature/par…
marinegor Feb 21, 2024
7e2ae21
Apply suggestions from code review
marinegor Feb 21, 2024
fded90e
Add parallelization section to the documentation
marinegor Feb 23, 2024
605b451
Fix versionadded in new classes
marinegor Feb 23, 2024
e2f1e1b
Finish parallelization section for documentation
marinegor Feb 23, 2024
f307d1e
Sync with develop
marinegor Feb 23, 2024
16030ad
Fix typos
marinegor Feb 23, 2024
c13f526
Merge branch 'develop' into feature/dask-0
RMeli Mar 4, 2024
4d0c8d3
Apply suggestions from code review
Mar 6, 2024
95b004f
Apply suggestions from code review
Mar 6, 2024
6e8de75
Refactor TreadsBackend example and add a warning
marinegor Apr 8, 2024
8d6bbfd
Add n_workers instantiation from backend argument
marinegor Apr 8, 2024
c9eaab9
Update package/MDAnalysis/analysis/backends.py
Apr 8, 2024
95d969e
Update package/doc/sphinx/source/documentation_pages/analysis/paralle…
Apr 8, 2024
c438381
Add remark about RMSF parallelization
marinegor Apr 8, 2024
140f252
Merge remote-tracking branch 'origin/feature/dask-0' into feature/par…
marinegor Apr 8, 2024
69a44ec
Merge develop and fix merge conflicts
marinegor Apr 8, 2024
7a67248
Apply suggestions from codereview
marinegor Apr 19, 2024
9c45568
Apply suggestions from code review
marinegor Apr 19, 2024
35fb1ae
Fix documentation typo
marinegor Apr 19, 2024
a8e0ccc
Update dask installation test after exception text changed
marinegor Apr 21, 2024
ab394a4
Merge branch 'develop' into feature/dask-0
yuxuanzhuang Apr 28, 2024
1324890
Merge branch 'feature/dask-0' of github.com:marinegor/mdanalysis into…
orbeckst May 1, 2024
68a7d23
edited documentation for parallelization
orbeckst May 4, 2024
41f231c
analysis top level docs fixes
orbeckst May 5, 2024
0964974
Merge remote-tracking branch 'origin/feature/dask-0' into feature/par…
marinegor May 20, 2024
9627212
Added comments regarding `_is_parallelizable` (and fixed documentatio…
marinegor May 20, 2024
99d43b9
Rename AnalysisBase.parallelizable and fix parallelizable transformat…
marinegor May 22, 2024
d63a38b
Remove explicit parallelizable=True in NoJump test call
marinegor May 22, 2024
c51139b
Merge branch 'develop' into feature/dask-0
orbeckst Jun 11, 2024
2d6fc0a
Apply suggestions from code review
orbeckst Jun 11, 2024
862cfb4
add explicit comment to AnalysisBase._analysis_algorithm_is_paralleli…
orbeckst Jun 11, 2024
fa87c2d
Add client_RMSD explanation
marinegor Jun 12, 2024
bb9df26
versioninformation markup fix in base.py
orbeckst Jun 13, 2024
57b581c
Merge branch 'develop' into feature/dask-0
orbeckst Jun 18, 2024
4eddd8c
Merge branch 'develop' into feature/dask-0
marinegor Jul 29, 2024
f182b9e
Apply suggestions from code review
marinegor Aug 5, 2024
e1f4e28
Apply suggestions from code review
marinegor Aug 5, 2024
d543f42
Add comments explaining client_... fixtures
marinegor Aug 5, 2024
611398d
Move class properties to the top of the class
marinegor Aug 5, 2024
e31e317
Undo accidental versionadded change
marinegor Aug 5, 2024
ccd1842
Remove duplicating versionadded
marinegor Aug 5, 2024
2ca27d0
Add versionadded for backend
marinegor Aug 5, 2024
c063585
Add link to github profile
marinegor Aug 5, 2024
8bbd901
Update package/doc/sphinx/source/documentation_pages/analysis/paralle…
orbeckst Aug 7, 2024
afba2c3
Update testsuite/MDAnalysisTests/analysis/test_backends.py
marinegor Aug 11, 2024
9fe37f9
minor text fixes
orbeckst Aug 15, 2024
ff4d0c2
Update package/MDAnalysis/analysis/base.py
marinegor Aug 16, 2024
f6672ff
Update package/MDAnalysis/analysis/base.py
marinegor Aug 16, 2024
1dc4613
Remove issubclass check
marinegor Aug 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 219 additions & 3 deletions package/MDAnalysis/analysis/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,49 @@
from MDAnalysis.core.groups import AtomGroup
from MDAnalysis.lib.log import ProgressBar

from functools import partial

logger = logging.getLogger(__name__)

marinegor marked this conversation as resolved.
Show resolved Hide resolved
def localdelayed(obj):
"""
Mock implementation of `dask.delayed.delayed` function
with the same semantics
"""
if isinstance(obj, Iterable):

class inner:
def __init__(self, iterable):
self._computations = iterable

def compute(self):
return [f.compute() for f in self._computations]

return inner(obj)

elif isinstance(obj, Callable):

class inner:
def __init__(self, *a, **kwa):
self._a = a
self._kwa = kwa
self._func = obj

def compute(self):
return self._func(*self._a, **self._kwa)

return inner
else:
raise ValueError(f"Argument should be Iterable or Callable, got {type(obj)}")

from itertools import islice

def split_every(n, iterable):
i = iter(iterable)
piece = list(islice(i, n))
while piece:
yield piece
piece = list(islice(i, n))

class Results(UserDict):
RMeli marked this conversation as resolved.
Show resolved Hide resolved
r"""Container object for storing results.
Expand Down Expand Up @@ -219,6 +260,11 @@ def __getstate__(self):
def __setstate__(self, state):
self.data = state

from typing import Callable, Iterable


# from dask import delayed
RMeli marked this conversation as resolved.
Show resolved Hide resolved


class AnalysisBase(object):
r"""Base class for defining multi-frame analysis
Expand Down Expand Up @@ -389,9 +435,10 @@ def _conclude(self):
"""
pass # pylint: disable=unnecessary-pass

def run(self, start=None, stop=None, step=None, frames=None,
verbose=None, *, progressbar_kwargs={}):
"""Perform the calculation
def _compute(self, start=None, stop=None, step=None,
marinegor marked this conversation as resolved.
Show resolved Hide resolved
frames=None, verbose=None, *, progressbar_kwargs={}):
"""Perform the calculation on frames that have been setup prior to that
using _setup_frames()

Parameters
----------
Expand Down Expand Up @@ -447,8 +494,177 @@ def run(self, start=None, stop=None, step=None, frames=None,
self.times[i] = ts.time
self._single_frame()
logger.info("Finishing up")
return self

def _setup_bslices(self, start=None, stop=None, step=None, frames=None,
n_bslices=None):
"""
Set the self._bslices (the workload distribution scheme) for the future delayed
computations.

marinegor marked this conversation as resolved.
Show resolved Hide resolved
Parameters
----------
marinegor marked this conversation as resolved.
Show resolved Hide resolved
start : int, optional
start frame of analysis
stop : int, optional
stop frame of analysis
step : int, optional
number of frames to skip between each analysed frame
orbeckst marked this conversation as resolved.
Show resolved Hide resolved
frames : array_like, optional
array of integers or booleans to slice trajectory; `frames` can
only be used *instead* of `start`, `stop`, and `step`. Setting
*both* `frames` and at least one of `start`, `stop`, `step` to a
non-default value will raise a :exc:`ValueError`.

Raises
------
ValueError
if *both* `frames` and at least one of `start`, `stop`, or `frames`
is provided (i.e., set to another value than ``None``)

Returns
-------
bslices : list of (bstart, bstop, bstep, bframes) tuples.
Iterator will have size of self._n_bslices.
"""
if frames is not None:
if not all(opt is None for opt in [start, stop, step]):
marinegor marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError("start/stop/step cannot be combined with "
"frames")
self.n_frames_total = len(frames)
n_bslices = self._n_bslices
slices = [(None, None, None, frames[i::n_bslices]) for i in range(n_bslices)]

else: # frames is None
start, stop, step = self._trajectory.check_slice_indices(start, stop, step)
n_frames = len(range(start, stop, step))
self.start, self.stop, self.step = start, stop, step
self.n_frames_total = n_frames

## this part was taken from pmda with slight modifications
n_bslices = self._n_bslices
bsizes = np.ones(n_bslices, dtype=np.int64) * n_frames // n_bslices
bsizes += np.arange(n_bslices, dtype=np.int64) < n_frames % n_bslices
# This can give a last index that is larger than the real last index;
# this is not a problem for slicing but it's not pretty.
# Example: original [0:20:3] -> n_frames=7, start=0, step=3:
# last frame 21 instead of 20
bsizes *= step
idx = np.cumsum(np.concatenate(([start], bsizes)))
slices = [(bstart, bstop, step, None)
for bstart, bstop in zip(idx[:-1], idx[1:])]

# fix very last stop index: make sure it's within trajectory range or None
# (no really critical because the slices will work regardless, but neater)
last = slices[-1]
last_stop = min(last[1], stop) if stop is not None else stop
slices[-1] = (last[0], last_stop, last[2], None)

self._bslices = slices
return self._bslices


def _setup_scheduler(self, scheduler, n_workers):
"""
Configure parameters necessary for running a distributed workload.

Parameters
----------
scheduler : dask.distributed.Client
dask scheduler object
n_workers : int, optional
number of workers (local or remote processes).
"""
if scheduler == 'localdask':
n_workers = 1
else:
from dask.distributed import Client
if isinstance(scheduler, Client):
# TODO: add assertions that make sure that
# you don't set
# both `scheduler` and `n_workers/threads_per_worker`
n_workers = len(scheduler.ncores())
else:
kwargs = {'n_workers':n_workers}
self._scheduler_kwargs = kwargs
self._n_bslices = self._n_workers = n_workers

def run(self, start=None, stop=None, step=None, frames=None,
verbose=None, n_workers=1,
*, scheduler=None,
progressbar_kwargs={},
):
"""Perform the calculation

Parameters
marinegor marked this conversation as resolved.
Show resolved Hide resolved
----------
start : int, optional
start frame of analysis
stop : int, optional
stop frame of analysis
step : int, optional
number of frames to skip between each analysed frame
frames : array_like, optional
array of integers or booleans to slice trajectory; `frames` can
only be used *instead* of `start`, `stop`, and `step`. Setting
*both* `frames` and at least one of `start`, `stop`, `step` to a
non-default value will raise a :exc:`ValueError`.

.. versionadded:: 2.2.0
marinegor marked this conversation as resolved.
Show resolved Hide resolved

verbose : bool, optional
Turn on verbosity

scheduler : str, optional
Enables running with different schedulers.
marinegor marked this conversation as resolved.
Show resolved Hide resolved

progressbar_kwargs : dict, optional
ProgressBar keywords with custom parameters regarding progress bar position, etc;
see :class:`MDAnalysis.lib.log.ProgressBar` for full list.


.. versionchanged:: 2.2.0
Added ability to analyze arbitrary frames by passing a list of
frame indices in the `frames` keyword argument.

.. versionchanged:: 2.5.0
Add `progressbar_kwargs` parameter,
allowing to modify description, position etc of tqdm progressbars
"""
RMeli marked this conversation as resolved.
Show resolved Hide resolved
if scheduler is None: # fallback to the local scheduler
self._compute(start=start, stop=stop, step=step, frames=frames,
verbose=verbose, progressbar_kwargs=progressbar_kwargs)
RMeli marked this conversation as resolved.
Show resolved Hide resolved
else:
yuxuanzhuang marked this conversation as resolved.
Show resolved Hide resolved
self._setup_scheduler(scheduler=scheduler, n_workers=n_workers)
if scheduler == 'localdask': # imitation of dask mainly for testing purposes
marinegor marked this conversation as resolved.
Show resolved Hide resolved
delayed = localdelayed
else: # elif scheduler is a type of dask scheduler
try:
from dask.delayed import delayed
except ImportError:
def delayed(*args, **kwargs): # need implementation to avoid syntax warnings below
RMeli marked this conversation as resolved.
Show resolved Hide resolved
pass
# raising exception here allows me to skip potentionally time-consuming setup steps
raise ImportError('Please install dask for this functionality')
RMeli marked this conversation as resolved.
Show resolved Hide resolved
self._setup_bslices(start=start, stop=stop, step=step,
frames=frames)

computations = delayed(
[
delayed(self._compute)(start=bstart, stop=bstop, step=bstep, frames=bframes)
for bstart, bstop, bstep, bframes in self._bslices
]
)
dask_results = computations.compute()
self._remote_results = dask_results
self._parallel_conclude()

self._conclude()

return self

def _parallel_conclude(self):
marinegor marked this conversation as resolved.
Show resolved Hide resolved
self.results = self._remote_results[0].results


class AnalysisFromFunction(AnalysisBase):
Expand Down
17 changes: 17 additions & 0 deletions testsuite/MDAnalysisTests/analysis/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import pytest

marinegor marked this conversation as resolved.
Show resolved Hide resolved
@pytest.fixture(params=[None, 'localdask',], scope='module')
def scheduler(request):
"""
Fixture for testing all possible schedulers.
If scheduler raises an exception, you should explicitly check it
using `with pytest.raises(NotImplementedError)`
"""
return request.param

@pytest.fixture(params=[None])
def localscheduler(request):
"""
Fixture for testing only local, i.e. before-dask, codepath.
"""
return request.param
32 changes: 16 additions & 16 deletions testsuite/MDAnalysisTests/analysis/test_align.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,9 +417,9 @@ def universe(self):
def reference(self):
return mda.Universe(PSF, CRD)

def test_average_structure_deprecated_attrs(self, universe, reference):
def test_average_structure_deprecated_attrs(self, universe, reference, scheduler):
# Issue #3278 - remove in MDAnalysis 3.0.0
avg = align.AverageStructure(universe, reference).run(stop=2)
avg = align.AverageStructure(universe, reference).run(stop=2, scheduler=scheduler)

wmsg = "The `universe` attribute was deprecated in MDAnalysis 2.0.0"
with pytest.warns(DeprecationWarning, match=wmsg):
Expand All @@ -434,45 +434,45 @@ def test_average_structure_deprecated_attrs(self, universe, reference):
with pytest.warns(DeprecationWarning, match=wmsg):
assert avg.rmsd == avg.results.rmsd

def test_average_structure(self, universe, reference):
def test_average_structure(self, universe, reference, scheduler):
ref, rmsd = _get_aligned_average_positions(self.ref_files, reference)
avg = align.AverageStructure(universe, reference).run()
avg = align.AverageStructure(universe, reference).run(scheduler=scheduler)
assert_almost_equal(avg.results.universe.atoms.positions, ref,
decimal=4)
assert_almost_equal(avg.results.rmsd, rmsd)

def test_average_structure_mass_weighted(self, universe, reference):
def test_average_structure_mass_weighted(self, universe, reference, scheduler):
ref, rmsd = _get_aligned_average_positions(self.ref_files, reference, weights='mass')
avg = align.AverageStructure(universe, reference, weights='mass').run()
avg = align.AverageStructure(universe, reference, weights='mass').run(scheduler=scheduler)
assert_almost_equal(avg.results.universe.atoms.positions, ref,
decimal=4)
assert_almost_equal(avg.results.rmsd, rmsd)

def test_average_structure_select(self, universe, reference):
def test_average_structure_select(self, universe, reference, scheduler):
select = 'protein and name CA and resid 3-5'
ref, rmsd = _get_aligned_average_positions(self.ref_files, reference, select=select)
avg = align.AverageStructure(universe, reference, select=select).run()
avg = align.AverageStructure(universe, reference, select=select).run(scheduler=scheduler)
assert_almost_equal(avg.results.universe.atoms.positions, ref,
decimal=4)
assert_almost_equal(avg.results.rmsd, rmsd)

def test_average_structure_no_ref(self, universe):
def test_average_structure_no_ref(self, universe, scheduler):
ref, rmsd = _get_aligned_average_positions(self.ref_files, universe)
avg = align.AverageStructure(universe).run()
avg = align.AverageStructure(universe).run(scheduler=scheduler)
assert_almost_equal(avg.results.universe.atoms.positions, ref,
decimal=4)
assert_almost_equal(avg.results.rmsd, rmsd)

def test_average_structure_no_msf(self, universe):
avg = align.AverageStructure(universe).run()
def test_average_structure_no_msf(self, universe, scheduler):
avg = align.AverageStructure(universe).run(scheduler=scheduler)
assert not hasattr(avg, 'msf')

def test_mismatch_atoms(self, universe):
u = mda.Merge(universe.atoms[:10])
with pytest.raises(SelectionError):
align.AverageStructure(universe, u)

def test_average_structure_ref_frame(self, universe):
def test_average_structure_ref_frame(self, universe, scheduler):
ref_frame = 3
u = mda.Merge(universe.atoms)

Expand All @@ -483,13 +483,13 @@ def test_average_structure_ref_frame(self, universe):
# back to start
universe.trajectory[0]
ref, rmsd = _get_aligned_average_positions(self.ref_files, u)
avg = align.AverageStructure(universe, ref_frame=ref_frame).run()
avg = align.AverageStructure(universe, ref_frame=ref_frame).run(scheduler=scheduler)
assert_almost_equal(avg.results.universe.atoms.positions, ref,
decimal=4)
assert_almost_equal(avg.results.rmsd, rmsd)

def test_average_structure_in_memory(self, universe):
avg = align.AverageStructure(universe, in_memory=True).run()
def test_average_structure_in_memory(self, universe, scheduler):
avg = align.AverageStructure(universe, in_memory=True).run(scheduler=scheduler)
reference_coordinates = universe.trajectory.timeseries().mean(axis=1)
assert_almost_equal(avg.results.universe.atoms.positions,
reference_coordinates, decimal=4)
Expand Down
10 changes: 6 additions & 4 deletions testsuite/MDAnalysisTests/analysis/test_atomicdistances.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,21 +115,23 @@ def test_ad_exceptions(self, ad_ag1, ad_ag3, ad_ag4):
# only need to test that this class correctly applies distance calcs
# calc_bonds() is tested elsewhere
def test_ad_pairwise_dist(self, ad_ag1, ad_ag2,
expected_dist):
expected_dist,
scheduler):
'''Ensure that pairwise distances between atoms are
correctly calculated without PBCs.'''
pairwise_no_pbc = (ad.AtomicDistances(ad_ag1, ad_ag2,
pbc=False).run())
pbc=False).run(scheduler=scheduler))
actual = pairwise_no_pbc.results

# compare with expected values from dist()
assert_allclose(actual, expected_dist)

def test_ad_pairwise_dist_pbc(self, ad_ag1, ad_ag2,
expected_pbc_dist):
expected_pbc_dist,
scheduler):
'''Ensure that pairwise distances between atoms are
correctly calculated with PBCs.'''
pairwise_pbc = (ad.AtomicDistances(ad_ag1, ad_ag2).run())
pairwise_pbc = (ad.AtomicDistances(ad_ag1, ad_ag2).run(scheduler=scheduler))
actual = pairwise_pbc.results

# compare with expected values from dist()
Expand Down
Loading