diff --git a/.travis.yml b/.travis.yml index 54ed50fd..93807f41 100644 --- a/.travis.yml +++ b/.travis.yml @@ -15,7 +15,7 @@ env: # Set default python version to avoid repetition later - PYTHON_VERSION=3.6 - MAIN_CMD="pytest" - - SETUP_CMD="pmda --pep8 -v --cov pmda" + - SETUP_CMD="pmda --pep8 --cov pmda" # mdanalysis develop from source (see below), which needs # minimal CONDA_MDANALYSIS_DEPENDENCIES #- CONDA_DEPENDENCIES="mdanalysis mdanalysistests dask joblib pytest-pep8 mock codecov cython hypothesis sphinx" diff --git a/AUTHORS b/AUTHORS index d283ee33..8c75971a 100644 --- a/AUTHORS +++ b/AUTHORS @@ -22,4 +22,5 @@ Chronological list of authors 2018 - Shujie Fan - Richard J Gowers + - Ioannis Paraskevakos diff --git a/CHANGELOG b/CHANGELOG index 21572956..a7c18fd2 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -13,7 +13,7 @@ The rules for this file: * release numbers follow "Semantic Versioning" http://semver.org ------------------------------------------------------------------------------ -xx/xx/18 VOD555, richardjgowers +xx/xx/18 VOD555, richardjgowers, iparask, orbeckst * 0.2.0 @@ -21,6 +21,11 @@ Enhancements * add add timing for _conclude and _prepare (Issue #49) * add parallel particle-particle RDF calculation module pmda.rdf (Issue #41) * add readonly_attributes context manager to ParallelAnalysisBase + * add parallel implementation of Leaflet Finder (Issue #47) + +Fixes + * always distribute frames over blocks so that no empty blocks are + created ("balanced blocks", Issue #71) 06/07/18 orbeckst diff --git a/conftest.py b/conftest.py new file mode 100644 index 00000000..a2b4ee6b --- /dev/null +++ b/conftest.py @@ -0,0 +1,32 @@ + +# -*- Mode: python; tab-width: 4; indent-tabs-mode:nil; coding:utf-8 -*- +# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 +# +# PMDA +# Copyright (c) 2017 The MDAnalysis Development Team and contributors +# (see the file AUTHORS for the full list of names) +# +# Released under the GNU Public Licence, v2 or any higher version + +from dask import distributed, multiprocessing +import pytest + + +@pytest.fixture(scope="session", params=(1, 2)) +def client(tmpdir_factory, request): + with tmpdir_factory.mktemp("dask_cluster").as_cwd(): + lc = distributed.LocalCluster(n_workers=request.param, processes=True) + client = distributed.Client(lc) + + yield client + + client.close() + lc.close() + + +@pytest.fixture(scope='session', params=('distributed', 'multiprocessing')) +def scheduler(request, client): + if request.param == 'distributed': + return client + else: + return multiprocessing diff --git a/docs/api.rst b/docs/api.rst index 7d2e3668..92708222 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -26,6 +26,7 @@ a single frame. If your need more flexibility you can use the api/parallel api/custom + api/util .. _pre-defined-analysis-tasks: @@ -42,3 +43,4 @@ also function as examples for how to implement your own functions with api/rms api/contacts api/rdf + api/leaflet diff --git a/docs/api/leaflet.rst b/docs/api/leaflet.rst new file mode 100644 index 00000000..11767989 --- /dev/null +++ b/docs/api/leaflet.rst @@ -0,0 +1,2 @@ +.. automodule:: pmda.leaflet + diff --git a/docs/api/util.rst b/docs/api/util.rst new file mode 100644 index 00000000..30d578ec --- /dev/null +++ b/docs/api/util.rst @@ -0,0 +1,2 @@ +.. automodule:: pmda.util + :members: diff --git a/docs/conf.py b/docs/conf.py index d9fb6398..a918e6b7 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -57,7 +57,7 @@ # General information about the project. project = u'PMDA' -author = u'Max Linke, Shujie Fan, Richard J. Gowers, Oliver Beckstein' +author = u'Max Linke, Shujie Fan, Richard J. Gowers, Oliver Beckstein, Ioannis Paraskevakos' copyright = u'2018, ' + author diff --git a/docs/references.rst b/docs/references.rst index 4655c4dd..9f9a2a1d 100644 --- a/docs/references.rst +++ b/docs/references.rst @@ -31,6 +31,15 @@ Huff, David Lippa, Dillon Niederhut, and M. Pacer, editors, Proceedings of the 16th Python in Science Conference, pages 64–72, Austin, - TX, 2017. SciPy. doi:`10.25080/shinma-7f4c6e7-00a`_ + TX, 2017. SciPy. doi:`10.25080/shinma-7f4c6e7-00a`_. -.. _`10.25080/shinma-7f4c6e7-00a`: https://doi.org/10.25080/shinma-7f4c6e7-00a +.. _`10.25080/shinma-7f4c6e7-00a`: https://doi.org/10.25080/shinma-7f4c6e7-00a + +.. [Paraskevakos2018] Ioannis Paraskevakos, Andre Luckow, Mahzad Khoshlessan, + George Chantzialexiou, Thomas E. Cheatham, Oliver + Beckstein, Geoffrey C. Fox and Shantenu Jha. Task- + parallel Analysis of Molecular Dynamics Trajectories. In + 47th International Conference on Parallel Processing + (ICPP 2018). doi: `10.1145/3225058.3225128`_. + +.. _`10.1145/3225058.3225128` : https://doi.org/10.1145/3225058.3225128 \ No newline at end of file diff --git a/pmda/leaflet.py b/pmda/leaflet.py new file mode 100644 index 00000000..3f879ae5 --- /dev/null +++ b/pmda/leaflet.py @@ -0,0 +1,310 @@ +# -*- Mode: python; tab-width: 4; indent-tabs-mode:nil; coding:utf-8 -*- +# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 +# +# PMDA +# Copyright (c) 2018 The MDAnalysis Development Team and contributors +# (see the file AUTHORS for the full list of names) +# +# Released under the GNU Public Licence, v2 or any higher version +""" +LeafletFInder Analysis tool --- :mod:`pmda.leaflet` +========================================================== + +This module contains parallel versions of analysis tasks in +:mod:`MDAnalysis.analysis.leaflet`. + +.. autoclass:: LeafletFinder + :members: + :undoc-members: + :inherited-members: + +""" +from __future__ import absolute_import, division + +import numpy as np +import dask.bag as db +import networkx as nx +from scipy.spatial import cKDTree + +import MDAnalysis as mda +from dask import distributed, multiprocessing +from joblib import cpu_count + +from .parallel import ParallelAnalysisBase, Timing +from .util import timeit + + +class LeafletFinder(ParallelAnalysisBase): + """Parallel Leaflet Finder analysis. + + Identify atoms in the same leaflet of a lipid bilayer. + This class implements and parallelizes the *LeafletFinder* algorithm + [Michaud-Agrawal2011]_. + + The parallelization is done based on [Paraskevakos2018]_. + + Attributes + ---------- + + Parameters + ---------- + Universe : :class:`~MDAnalysis.core.groups.Universe` + a :class:`MDAnalysis.core.groups.Universe` (the + `atomgroup` must belong to this Universe) + atomgroup : tuple of :class:`~MDAnalysis.core.groups.AtomGroup` + atomgroups that are iterated in parallel + + Note + ---- + At the moment, this class has far fewer features than the serial + version :class:`MDAnalysis.analysis.leaflet.LeafletFinder`. + + This version offers Leaflet Finder algorithm 4 ("Tree-based Nearest + Neighbor and Parallel-Connected Com- ponents (Tree-Search)") in + [Paraskevakos2018]_. + + Currently, periodic boundaries are not taken into account. + + The calculation is parallelized on a per-frame basis; + at the moment, no parallelization over trajectory blocks is performed. + + """ + + def __init__(self, universe, atomgroups): + self._atomgroup = atomgroups + self._results = list() + + super(LeafletFinder, self).__init__(universe, (atomgroups,)) + + def _find_connected_components(self, data, cutoff=15.0): + """Perform the Connected Components discovery for the atoms in data. + + Parameters + ---------- + data : Tuple of lists of Numpy arrays + This is a data and index tuple. The data are organized as + `([AtomPositions1,AtomPositions2], + [index1,index2])`. `index1` and `index2` are showing the + position of the `AtomPosition` in the adjacency matrix and + allows to correct the node number of the produced graph. + cutoff : float (optional) + head group-defining atoms within a distance of `cutoff` + Angstroms are deemed to be in the same leaflet [15.0] + + Returns + ------- + values : list. + A list of all the connected components of the graph that is + generated from `data` + + """ + window, index = data[0] + num = window[0].shape[0] + i_index = index[0] + j_index = index[1] + graph = nx.Graph() + + if i_index == j_index: + train = window[0] + test = window[1] + else: + train = np.vstack([window[0], window[1]]) + test = np.vstack([window[0], window[1]]) + + tree = cKDTree(train, leafsize=40) + edges = tree.query_ball_point(test, cutoff) + edge_list = [list(zip(np.repeat(idx, len(dest_list)), dest_list)) + for idx, dest_list in enumerate(edges)] + + edge_list_flat = np.array([list(item) for sublist in edge_list for + item in sublist]) + + if i_index == j_index: + res = edge_list_flat.transpose() + res[0] = res[0] + i_index - 1 + res[1] = res[1] + j_index - 1 + else: + removed_elements = list() + for i in range(edge_list_flat.shape[0]): + if (edge_list_flat[i, 0] >= 0 and + edge_list_flat[i, 0] <= num - 1) and \ + (edge_list_flat[i, 1] >= 0 and + edge_list_flat[i, 1] <= num - 1) or \ + (edge_list_flat[i, 0] >= num and + edge_list_flat[i, 0] <= 2 * num - 1) and \ + (edge_list_flat[i, 1] >= num and + edge_list_flat[i, 1] <= 2 * num - 1) or \ + (edge_list_flat[i, 0] >= num and + edge_list_flat[i, 0] <= 2 * num - 1) and \ + (edge_list_flat[i, 1] >= 0 and + edge_list_flat[i, 1] <= num - 1): + removed_elements.append(i) + res = np.delete(edge_list_flat, removed_elements, + axis=0).transpose() + res[0] = res[0] + i_index - 1 + res[1] = res[1] - num + j_index - 1 + if res.shape[1] == 0: + res = np.zeros((2, 1), dtype=np.int) + + edges = [(res[0, k], res[1, k]) for k in range(0, res.shape[1])] + graph.add_edges_from(edges) + + # partial connected components + + subgraphs = nx.connected_components(graph) + comp = [g for g in subgraphs] + return comp + + # pylint: disable=arguments-differ + def _single_frame(self, ts, atomgroups, scheduler_kwargs, n_jobs, + cutoff=15.0): + """Perform computation on a single trajectory frame. + + Must return computed values as a list. You can only **read** + from member variables stored in ``self``. Changing them during + a run will result in undefined behavior. `ts` and any of the + atomgroups can be changed (but changes will be overwritten + when the next time step is read). + + Parameters + ---------- + scheduler_kwargs : Dask Scheduler parameters. + cutoff : float (optional) + head group-defining atoms within a distance of `cutoff` + Angstroms are deemed to be in the same leaflet [15.0] + + Returns + ------- + values : anything + The output from the computation over a single frame must + be returned. The `value` will be added to a list for each + block and the list of blocks is stored as :attr:`_results` + before :meth:`_conclude` is run. In order to simplify + processing, the `values` should be "simple" shallow data + structures such as arrays or lists of numbers. + + """ + + # Get positions of the atoms in the atomgroup and find their number. + atoms = ts.positions[atomgroups.indices] + matrix_size = atoms.shape[0] + arranged_coord = list() + part_size = int(matrix_size / n_jobs) + # Partition the data based on a 2-dimensional partitioning + for i in range(1, matrix_size + 1, part_size): + for j in range(i, matrix_size + 1, part_size): + arranged_coord.append(([atoms[i - 1:i - 1 + part_size], + atoms[j - 1:j - 1 + part_size]], + [i, j])) + # Distribute the data over the available cores, apply the map function + # and execute. + parAtoms = db.from_sequence(arranged_coord, + npartitions=len(arranged_coord)) + parAtomsMap = parAtoms.map_partitions(self._find_connected_components, + cutoff=cutoff) + Components = parAtomsMap.compute(**scheduler_kwargs) + + # Gather the results and start the reduction. TODO: think if it can go + # to the private _reduce method of the based class. + result = list(Components) + + # Create the overall connected components of the graph + while len(result) != 0: + item1 = result[0] + result.pop(0) + ind = [] + for i, item2 in enumerate(Components): + if item1.intersection(item2): + item1 = item1.union(item2) + ind.append(i) + ind.reverse() + for j in ind: + Components.pop(j) + Components.append(item1) + + # Change output for and return. + indices = [np.sort(list(g)) for g in Components] + return indices + + # pylint: disable=arguments-differ + def run(self, + start=None, + stop=None, + step=None, + scheduler=None, + n_jobs=-1, + cutoff=15.0): + """Perform the calculation + + Parameters + ---------- + start : int, optional + start frame of analysis + stop : int, optional + stop frame of analysis + step : int, optional + number of frames to skip between each analysed frame + scheduler : dask scheduler, optional + Use dask scheduler, defaults to multiprocessing. This can be used + to spread work to a distributed scheduler + n_jobs : int, optional + number of tasks to start, if `-1` use number of logical cpu cores. + This argument will be ignored when the distributed scheduler is + used + + """ + if scheduler is None: + scheduler = multiprocessing + + if n_jobs == -1: + if scheduler == multiprocessing: + n_jobs = cpu_count() + elif isinstance(scheduler, distributed.Client): + n_jobs = len(scheduler.ncores()) + else: + raise ValueError( + "Couldn't guess ideal number of jobs from scheduler." + "Please provide `n_jobs` in call to method.") + + with timeit() as b_universe: + universe = mda.Universe(self._top, self._traj) + + scheduler_kwargs = {'get': scheduler.get} + if scheduler == multiprocessing: + scheduler_kwargs['num_workers'] = n_jobs + + start, stop, step = self._trajectory.check_slice_indices( + start, stop, step) + with timeit() as total: + with timeit() as prepare: + self._prepare() + + with self.readonly_attributes(): + timings = list() + times_io = [] + for frame in range(start, stop, step): + with timeit() as b_io: + ts = universe.trajectory[frame] + times_io.append(b_io.elapsed) + with timeit() as b_compute: + components = self. \ + _single_frame(ts=ts, + atomgroups=self._atomgroup, + scheduler_kwargs=scheduler_kwargs, + n_jobs=n_jobs, + cutoff=cutoff) + + timings.append(b_compute.elapsed) + leaflet1 = self._atomgroup[components[0]] + leaflet2 = self._atomgroup[components[1]] + self._results.append([leaflet1, leaflet2]) + with timeit() as conclude: + self._conclude() + self.timing = Timing(times_io, + np.hstack(timings), total.elapsed, + b_universe.elapsed, prepare.elapsed, + conclude.elapsed) + return self + + def _conclude(self): + self.results = self._results diff --git a/pmda/parallel.py b/pmda/parallel.py index d56bb6ac..0c5d9fa0 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -16,6 +16,8 @@ """ from __future__ import absolute_import, division from contextlib import contextmanager +import warnings + from six.moves import range import MDAnalysis as mda @@ -24,7 +26,7 @@ from joblib import cpu_count import numpy as np -from .util import timeit +from .util import timeit, make_balanced_slices class Timing(object): @@ -313,7 +315,15 @@ def run(self, start, stop, step = self._trajectory.check_slice_indices( start, stop, step) n_frames = len(range(start, stop, step)) - bsize = int(np.ceil(n_frames / float(n_blocks))) + + if n_frames == 0: + warnings.warn("run() analyses no frames: check start/stop/step") + if n_frames < n_blocks: + warnings.warn("run() uses more blocks than frames: " + "decrease n_blocks") + + slices = make_balanced_slices(n_frames, n_blocks, + start=start, stop=stop, step=step) with timeit() as total: with timeit() as prepare: @@ -321,18 +331,20 @@ def run(self, time_prepare = prepare.elapsed blocks = [] with self.readonly_attributes(): - for b in range(n_blocks): + for bslice in slices: task = delayed( self._dask_helper, pure=False)( - b * bsize * step + start, - min(stop, (b + 1) * bsize * step + start), - step, + bslice, self._indices, self._top, self._traj, ) blocks.append(task) blocks = delayed(blocks) res = blocks.compute(**scheduler_kwargs) + # hack to handle n_frames == 0 in this framework + if len(res) == 0: + # everything else wants list of block tuples + res = [([], [], [], 0)] self._results = np.asarray([el[0] for el in res]) with timeit() as conclude: self._conclude() @@ -343,7 +355,7 @@ def run(self, np.array([el[3] for el in res]), time_prepare, conclude.elapsed) return self - def _dask_helper(self, start, stop, step, indices, top, traj): + def _dask_helper(self, bslice, indices, top, traj): """helper function to actually setup dask graph""" with timeit() as b_universe: u = mda.Universe(top, traj) @@ -352,8 +364,12 @@ def _dask_helper(self, start, stop, step, indices, top, traj): res = [] times_io = [] times_compute = [] - for i in range(start, stop, step): + # NOTE: bslice.stop cannot be None! Always make sure + # that it comes from _trajectory.check_slice_indices()! + for i in range(bslice.start, bslice.stop, bslice.step): with timeit() as b_io: + # explicit instead of 'for ts in u.trajectory[bslice]' + # so that we can get accurate timing. ts = u.trajectory[i] with timeit() as b_compute: res = self._reduce(res, self._single_frame(ts, agroups)) diff --git a/pmda/rdf.py b/pmda/rdf.py index eb26a44a..40aebc39 100644 --- a/pmda/rdf.py +++ b/pmda/rdf.py @@ -164,7 +164,7 @@ def _conclude(self, ): @staticmethod def _reduce(res, result_single_frame): """ 'add' action for an accumulator""" - if res == []: + if isinstance(res, list) and len(res) == 0: # Convert res from an empty list to a numpy array # which has the same shape as the single frame result res = result_single_frame diff --git a/pmda/test/test_custom.py b/pmda/test/test_custom.py index 5ec6d2e2..e29dcb97 100644 --- a/pmda/test/test_custom.py +++ b/pmda/test/test_custom.py @@ -8,32 +8,33 @@ # Released under the GNU Public Licence, v2 or any higher version from __future__ import absolute_import, division -import pytest import numpy as np - -from numpy.testing import assert_equal - import MDAnalysis as mda -from pmda import custom - from MDAnalysisTests.datafiles import PSF, DCD from MDAnalysisTests.util import no_deprecated_call +import pytest +from numpy.testing import assert_equal + +from pmda import custom def custom_function(mobile): return mobile.center_of_geometry() -def test_AnalysisFromFunction(): +def test_AnalysisFromFunction(scheduler): u = mda.Universe(PSF, DCD) step = 2 - ana1 = custom.AnalysisFromFunction(custom_function, u, - u.atoms).run(step=step) - ana2 = custom.AnalysisFromFunction(custom_function, u, - u.atoms).run(step=step) - ana3 = custom.AnalysisFromFunction(custom_function, u, - u.atoms).run(step=step) + ana1 = custom.AnalysisFromFunction(custom_function, u, u.atoms).run( + step=step, scheduler=scheduler + ) + ana2 = custom.AnalysisFromFunction(custom_function, u, u.atoms).run( + step=step, scheduler=scheduler + ) + ana3 = custom.AnalysisFromFunction(custom_function, u, u.atoms).run( + step=step, scheduler=scheduler + ) results = [] for ts in u.trajectory[::step]: @@ -53,8 +54,9 @@ def test_AnalysisFromFunction_otherAgs(): u2 = mda.Universe(PSF, DCD) u3 = mda.Universe(PSF, DCD) step = 2 - ana = custom.AnalysisFromFunction(custom_function_2, u, u.atoms, u2.atoms, - u3.atoms).run(step=step) + ana = custom.AnalysisFromFunction( + custom_function_2, u, u.atoms, u2.atoms, u3.atoms + ).run(step=step) results = [] for ts in u.trajectory[::step]: diff --git a/pmda/test/test_leaflet.py b/pmda/test/test_leaflet.py new file mode 100644 index 00000000..cf16b57b --- /dev/null +++ b/pmda/test/test_leaflet.py @@ -0,0 +1,64 @@ +from __future__ import absolute_import, division, print_function + +import pytest +from numpy.testing import (assert_almost_equal, assert_array_equal, + assert_array_almost_equal) + +import MDAnalysis +from MDAnalysisTests.datafiles import Martini_membrane_gro +from MDAnalysisTests.datafiles import GRO_MEMPROT, XTC_MEMPROT +from dask import multiprocessing +from pmda import leaflet +import numpy as np + + +class TestLeafLet(object): + + @pytest.fixture() + def u_one_frame(self): + return MDAnalysis.Universe(Martini_membrane_gro) + + @pytest.fixture() + def universe(self): + return MDAnalysis.Universe(GRO_MEMPROT, XTC_MEMPROT) + + @pytest.fixture() + def correct_values(self): + return [np.array([36507, 36761, 37523, 37650, 38031, 38285]), + np.array([36634]), + np.array([36507, 36761, 38285, 39174]), + np.array([36634]), + np.array([36507, 36761, 37650, 38285, 39174, 39936]), + np.array([36634]), + np.array([36507, 36761, 37650, 38285, 39174, 39428, 39936]), + np.array([36634]), + np.array([36507, 36761]), + np.array([36634])] + + @pytest.fixture() + def correct_values_single_frame(self): + return [np.arange(1, 2150, 12), np.arange(2521, 4670, 12)] + + def test_leaflet(self, universe, correct_values): + lipid_heads = universe.select_atoms("name P and resname POPG") + universe.trajectory.rewind() + leaflets = leaflet.LeafletFinder(universe, lipid_heads) + leaflets.run(scheduler=multiprocessing, n_jobs=1) + results = [atoms.indices for atomgroup in leaflets.results + for atoms in atomgroup] + [assert_almost_equal(x, y, err_msg="error: leaflets should match " + + "test values") for x, y in + zip(results, correct_values)] + + def test_leaflet_single_frame(self, + u_one_frame, + correct_values_single_frame): + lipid_heads = u_one_frame.select_atoms("name PO4") + u_one_frame.trajectory.rewind() + leaflets = leaflet.LeafletFinder(u_one_frame, + lipid_heads).run(start=0, stop=1) + + assert_almost_equal([atoms.indices for atomgroup in leaflets.results + for atoms in atomgroup], + correct_values_single_frame, err_msg="error: " + + "leaflets should match test values") diff --git a/pmda/test/test_parallel.py b/pmda/test/test_parallel.py index 974f9591..5ce82fe2 100644 --- a/pmda/test/test_parallel.py +++ b/pmda/test/test_parallel.py @@ -78,16 +78,17 @@ def test_sub_frames(analysis, n_jobs): np.testing.assert_almost_equal(analysis.res, [10, 20, 30, 40]) -@pytest.fixture(scope="session") -def client(tmpdir_factory): - with tmpdir_factory.mktemp("dask_cluster").as_cwd(): - lc = distributed.LocalCluster(n_workers=2, processes=True) - client = distributed.Client(lc) - - yield client - - client.close() - lc.close() +@pytest.mark.parametrize('n_jobs', (1, 2, 3)) +def test_no_frames(analysis, n_jobs): + u = mda.Universe(analysis._top, analysis._traj) + n_frames = u.trajectory.n_frames + with pytest.warns(UserWarning): + analysis.run(start=n_frames, stop=n_frames+1, n_jobs=n_jobs) + assert len(analysis.res) == 0 + np.testing.assert_equal(analysis.res, []) + np.testing.assert_equal(analysis.timing.compute, []) + np.testing.assert_equal(analysis.timing.io, []) + assert analysis.timing.universe == 0 @pytest.fixture(scope='session', params=('distributed', 'multiprocessing')) @@ -101,7 +102,15 @@ def scheduler(request, client): def test_scheduler(analysis, scheduler): analysis.run(scheduler=scheduler) - + +def test_nframes_less_nblocks_warning(analysis): + u = mda.Universe(analysis._top, analysis._traj) + n_frames = u.trajectory.n_frames + with pytest.warns(UserWarning): + analysis.run(stop=2, n_blocks=4, n_jobs=2) + assert len(analysis.res) == 2 + + @pytest.mark.parametrize('n_blocks', np.arange(1, 11)) def test_nblocks(analysis, n_blocks): analysis.run(n_blocks=n_blocks) diff --git a/pmda/test/test_util.py b/pmda/test/test_util.py index 97edd4aa..a8aea221 100644 --- a/pmda/test/test_util.py +++ b/pmda/test/test_util.py @@ -8,10 +8,15 @@ # Released under the GNU Public Licence, v2 or any higher version from __future__ import absolute_import +from six.moves import range + +import pytest + import time -from numpy.testing import assert_almost_equal +import numpy as np +from numpy.testing import assert_almost_equal, assert_equal -from pmda.util import timeit +from pmda.util import timeit, make_balanced_slices def test_timeit(): @@ -19,3 +24,109 @@ def test_timeit(): time.sleep(1) assert_almost_equal(timer.elapsed, 1, decimal=2) + + +@pytest.mark.parametrize("start", (None, 0, 1, 10)) +@pytest.mark.parametrize("n_frames,n_blocks,result", [ + (5, 1, [slice(0, None, 1)]), + (5, 2, [slice(0, 3, 1), slice(3, None, 1)]), + (5, 3, [slice(0, 2, 1), slice(2, 4, 1), slice(4, None, 1)]), + (5, 4, [slice(0, 2, 1), slice(2, 3, 1), slice(3, 4, 1), + slice(4, None, 1)]), + (5, 5, [slice(0, 1, 1), slice(1, 2, 1), slice(2, 3, 1), slice(3, 4, 1), + slice(4, None, 1)]), + (10, 2, [slice(0, 5, 1), slice(5, None, 1)]), + (10, 3, [slice(0, 4, 1), slice(4, 7, 1), slice(7, None, 1)]), + (10, 7, [slice(0, 2, 1), slice(2, 4, 1), slice(4, 6, 1), slice(6, 7, 1), + slice(7, 8, 1), slice(8, 9, 1), slice(9, None, 1)]), +]) +def test_make_balanced_slices_step1(n_frames, n_blocks, start, result, step=1): + assert step in (None, 1), "This test can only test step None or 1" + + _start = start if start is not None else 0 + _result = [slice(sl.start + _start, + sl.stop + _start if sl.stop is not None else None, + sl.step) for sl in result] + + slices = make_balanced_slices(n_frames, n_blocks, + start=start, step=step) + assert_equal(slices, _result) + + +def _test_make_balanced_slices(n_blocks, start, stop, step, scale): + _start = start if start is not None else 0 + + traj_frames = range(scale * stop) + frames = traj_frames[start:stop:step] + n_frames = len(frames) + + slices = make_balanced_slices(n_frames, n_blocks, + start=start, stop=stop, step=step) + + assert len(slices) == n_blocks + + # assemble frames again by blocks and show that we have all + # the original frames; get the sizes of the blocks + + block_frames = [] + block_sizes = [] + for bslice in slices: + bframes = traj_frames[bslice] + block_frames.extend(list(bframes)) + block_sizes.append(len(bframes)) + block_sizes = np.array(block_sizes) + + # check that we have all the frames accounted for + assert_equal(np.asarray(block_frames), np.asarray(frames)) + + # check that the distribution is balanced + if n_frames >= n_blocks: + assert np.all(block_sizes > 0) + minsize = n_frames // n_blocks + assert not np.setdiff1d(block_sizes, [minsize, minsize+1]), \ + "For n_blocks <= n_frames, block sizes are not balanced" + else: + # pathological case; we will have blocks with length 0 + # and n_blocks with 1 frame + zero_blocks = block_sizes == 0 + assert np.sum(zero_blocks) == n_blocks - n_frames + assert np.sum(~zero_blocks) == n_frames + assert not np.setdiff1d(block_sizes[~zero_blocks], [1]), \ + "For n_blocks>n_frames, some blocks contain != 1 frame" + + +@pytest.mark.parametrize('n_blocks', [1, 2, 3, 4, 5, 7, 10, 11]) +@pytest.mark.parametrize('start', [0, 1, 10]) +@pytest.mark.parametrize('stop', [11, 100, 256]) +@pytest.mark.parametrize('step', [None, 1, 2, 3, 5, 7]) +@pytest.mark.parametrize('scale', [1, 2]) +def test_make_balanced_slices(n_blocks, start, stop, step, scale): + return _test_make_balanced_slices(n_blocks, start, stop, step, scale) + + +def test_make_balanced_slices_step_gt_stop(n_blocks=2, start=None, + stop=5, step=6, scale=1): + return _test_make_balanced_slices(n_blocks, start, stop, step, scale) + + +@pytest.mark.parametrize('n_blocks', [1, 2]) +@pytest.mark.parametrize('start', [0, 10]) +@pytest.mark.parametrize('step', [None, 1, 2]) +def test_make_balanced_slices_empty(n_blocks, start, step): + slices = make_balanced_slices(0, n_blocks, start=start, step=step) + assert slices == [] + + +@pytest.mark.parametrize("n_frames,n_blocks,start,stop,step", + [(-1, 5, None, None, None), (5, 0, None, None, None), + (5, -1, None, None, None), (0, 0, None, None, None), + (-1, -1, None, None, None), + (5, 4, -1, None, None), (0, 5, -1, None, None), + (5, 0, -1, None, None), + (5, 4, None, -1, None), (5, 4, 3, 2, None), + (5, 4, None, None, -1), (5, 4, None, None, 0)]) +def test_make_balanced_slices_ValueError(n_frames, n_blocks, + start, stop, step): + with pytest.raises(ValueError): + make_balanced_slices(n_frames, n_blocks, + start=start, stop=stop, step=step) diff --git a/pmda/util.py b/pmda/util.py index c36b0604..7116e69c 100644 --- a/pmda/util.py +++ b/pmda/util.py @@ -6,20 +6,47 @@ # (see the file AUTHORS for the full list of names) # # Released under the GNU Public Licence, v2 or any higher version -""" -Utility functions --- :mod:`pmda.util` -===================================================================== +"""Utility functions --- :mod:`pmda.util` +========================================= + +This module contains helper functions and classes that can be used throughout +:mod:`pmda`. """ -from __future__ import absolute_import +from __future__ import absolute_import, division import time +import numpy as np + class timeit(object): """measure time spend in context + :class:`timeit` is a context manager (to be used with the :keyword:`with` + statement) that records the execution time for the enclosed context block + in :attr:`elapsed`. + + Attributes + ---------- + elapsed : float + Time in seconds that elapsed between entering + and exiting the context. + + Example + ------- + Use as a context manager:: + + with timeit() as total: + # code to be timed + + print(total.elapsed, "seconds") + + See Also + -------- + :func:`time.time` + """ def __enter__(self): self._start_time = time.time() @@ -30,3 +57,123 @@ def __exit__(self, exc_type, exc_val, exc_tb): self.elapsed = end_time - self._start_time # always propagate exceptions forward return False + + +def make_balanced_slices(n_frames, n_blocks, start=None, stop=None, step=None): + """Divide `n_frames` into `n_blocks` balanced blocks. + + The blocks are generated in such a way that they contain equal numbers of + frames when possible, but there are also no empty blocks (which can happen + with a naive distribution of ``ceil(n_frames/n_blocks)`` per block and a + remainder block). + + If the trajectory is sliced in any way (``u.trajectory[start:stop:step]``) + then the appropriate values for `start`, `stop`, and `step` must be passed + to this function. Defaults can be set to ``None``. Only a subset of legal + values for slices is supported: ``0 ≤ start ≤ stop`` and ``step ≥ 1``. + + Arguments + --------- + n_frames : int + number of frames in the trajectory (≥0). This must be the + number of frames *after* the trajectory has been sliced, + i.e. ``len(u.trajectory[start:stop:step])``. If any of + `start`, `stop, and `step` are not the defaults (left empty or + set to ``None``) they must be provided as parameters. + n_blocks : int + number of blocks (>0) + start : int or None + The first index of the trajectory (default is ``None``, which + is interpreted as "first frame", i.e., 0). + stop : int or None + The index of the last frame + 1 (default is ``None``, which is + interpreted as "up to and including the last frame". + step : int or None + Step size by which the trajectory is sliced; the default is + ``None`` which corresponds to ``step=1``. + + Returns + ------- + slices : list of slice + List of length ``n_blocks`` with one :class:`slice` + for each block. + + If `n_frames` = 0 then an empty list ``[]`` is returned. + + Example + ------- + For a trajectory with 5 frames and 4 blocks we get block sizes ``[2, 1, 1, + 1]`` (instead of ``[2, 2, 1, 0]`` with the naive algorithm). + + The slices will be ``[slice(0, 2, None), slice(2, 3, None), + slice(3, 4, None), slice(4, 5, None)]``. + + The indices can be used to slice a trajectory into blocks:: + + n_blocks = 5 + n_frames = len(u.trajectory[start:stop:step]) + + slices = make_balanced_slices(n_frames, n_blocks, + start=start, stop=stop, step=step) + for i_block, block in enumerate(slices): + for ts in u.trajectory[block]: + # do stuff for block number i_block + + Notes + ----- + Explanation of the algorithm: For `M` frames in the trajectory and + `N` blocks (or processes), where `i` with 0 ≤ `i` ≤ `N` - 1 is the + block number and `m[i]` is the number of frames for block `i` we + get a *balanced distribution* (one that does not contain blocks of + size 0) with the algorithm :: + + m[i] = M // N # initial frames for block i + r = M % N # remaining frames 0 ≤ r < N + for i in range(r): + m[i] += 1 # distribute the remaining frames + # over the first r blocks + + For a `step` > 1, we use ``m[i] *= step``. The last slice will + never go beyond the original `stop` if a value was provided. + + .. versionadded:: 0.2.0 + + """ + + start = int(start) if start is not None else 0 + stop = int(stop) if stop is not None else None + step = int(step) if step is not None else 1 + + if n_frames < 0: + raise ValueError("n_frames must be >= 0") + elif n_blocks < 1: + raise ValueError("n_blocks must be > 0") + elif start < 0: + raise ValueError("start must be >= 0 or None") + elif stop is not None and stop < start: + raise ValueError("stop must be >= start and >= 0 or None") + elif step < 1: + raise ValueError("step must be > 0 or None") + + if n_frames == 0: + # not very useful but allows calling code to work more gracefully + return [] + + bsizes = np.ones(n_blocks, dtype=np.int64) * n_frames // n_blocks + bsizes += (np.arange(n_blocks, dtype=np.int64) < n_frames % n_blocks) + # This can give a last index that is larger than the real last index; + # this is not a problem for slicing but it's not pretty. + # Example: original [0:20:3] -> n_frames=7, start=0, step=3: + # last frame 21 instead of 20 + bsizes *= step + idx = np.cumsum(np.concatenate(([start], bsizes))) + slices = [slice(bstart, bstop, step) + for bstart, bstop in zip(idx[:-1], idx[1:])] + + # fix very last stop index: make sure it's within trajectory range or None + # (no really critical because the slices will work regardless, but neater) + last = slices[-1] + last_stop = min(last.stop, stop) if stop is not None else stop + slices[-1] = slice(last.start, last_stop, last.step) + + return slices diff --git a/setup.py b/setup.py index c53a86c9..c11a218a 100644 --- a/setup.py +++ b/setup.py @@ -51,7 +51,9 @@ 'MDAnalysis>=0.18', 'dask>=0.18', 'six', - 'joblib', # cpu_count func currently + 'joblib', # cpu_count func currently + 'networkx', + 'scipy', ], tests_require=[ 'pytest',