From ab791ac4345b2e638f7984dd6b6b5d4631c5ec95 Mon Sep 17 00:00:00 2001 From: Max Linke Date: Thu, 20 Sep 2018 15:14:54 +0200 Subject: [PATCH 01/10] update to dask 0.18.0 --- pmda/parallel.py | 8 ++++---- setup.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pmda/parallel.py b/pmda/parallel.py index 7784391e..d1c35ffc 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -291,13 +291,13 @@ def run(self, """ if scheduler is None: - scheduler = multiprocessing + scheduler = 'multiprocessing' if n_jobs == -1: n_jobs = cpu_count() if n_blocks is None: - if scheduler == multiprocessing: + if scheduler == 'multiprocessing': n_blocks = n_jobs elif isinstance(scheduler, distributed.Client): n_blocks = len(scheduler.ncores()) @@ -306,8 +306,8 @@ def run(self, "Couldn't guess ideal number of blocks from scheduler." "Please provide `n_blocks` in call to method.") - scheduler_kwargs = {'get': scheduler.get} - if scheduler == multiprocessing: + scheduler_kwargs = {'scheduler': scheduler} + if scheduler == 'multiprocessing': scheduler_kwargs['num_workers'] = n_jobs start, stop, step = self._trajectory.check_slice_indices( diff --git a/setup.py b/setup.py index 5a14ee9d..c53a86c9 100644 --- a/setup.py +++ b/setup.py @@ -49,7 +49,7 @@ packages=find_packages(), install_requires=[ 'MDAnalysis>=0.18', - 'dask', + 'dask>=0.18', 'six', 'joblib', # cpu_count func currently ], From f6980f74a9ff2d601485178b54f5b608cd022de3 Mon Sep 17 00:00:00 2001 From: Max Linke Date: Thu, 20 Sep 2018 16:05:15 +0200 Subject: [PATCH 02/10] test with strings --- pmda/test/test_parallel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pmda/test/test_parallel.py b/pmda/test/test_parallel.py index 9a874d24..d138b412 100644 --- a/pmda/test/test_parallel.py +++ b/pmda/test/test_parallel.py @@ -95,7 +95,7 @@ def scheduler(request, client): if request.param == 'distributed': return client else: - return multiprocessing + return request.param def test_scheduler(analysis, scheduler): From 56dd56a5d08d0f52c6d7661ce1744bd9f3a5c91d Mon Sep 17 00:00:00 2001 From: Max Linke Date: Thu, 20 Sep 2018 16:05:54 +0200 Subject: [PATCH 03/10] remove unused imports --- pmda/parallel.py | 2 +- pmda/test/test_parallel.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pmda/parallel.py b/pmda/parallel.py index d1c35ffc..d56bb6ac 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -19,7 +19,7 @@ from six.moves import range import MDAnalysis as mda -from dask import distributed, multiprocessing +from dask import distributed from dask.delayed import delayed from joblib import cpu_count import numpy as np diff --git a/pmda/test/test_parallel.py b/pmda/test/test_parallel.py index d138b412..974f9591 100644 --- a/pmda/test/test_parallel.py +++ b/pmda/test/test_parallel.py @@ -14,7 +14,7 @@ from MDAnalysisTests.datafiles import DCD, PSF import joblib -from dask import distributed, multiprocessing +from dask import distributed from pmda import parallel From 5103271c4f4cc9604de905f11fee27b696c7a579 Mon Sep 17 00:00:00 2001 From: Oliver Beckstein Date: Mon, 29 Oct 2018 01:04:49 -0700 Subject: [PATCH 04/10] replaced get --> scheduler in leaflet and use 'multiprocessing' as string - modified tests so that they use default scheduler - supplying n_jobs - NOTE: test_leaflets() failes for n_jobs=2; this NEEDS TO BE FIXED in a separate PR; right now this is marked as XFAIL --- pmda/leaflet.py | 18 +++++++++--------- pmda/test/test_leaflet.py | 14 +++++++++----- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/pmda/leaflet.py b/pmda/leaflet.py index 3f879ae5..bc9e94b9 100644 --- a/pmda/leaflet.py +++ b/pmda/leaflet.py @@ -7,8 +7,8 @@ # # Released under the GNU Public Licence, v2 or any higher version """ -LeafletFInder Analysis tool --- :mod:`pmda.leaflet` -========================================================== +LeafletFinder Analysis tool --- :mod:`pmda.leaflet` +=================================================== This module contains parallel versions of analysis tasks in :mod:`MDAnalysis.analysis.leaflet`. @@ -27,7 +27,7 @@ from scipy.spatial import cKDTree import MDAnalysis as mda -from dask import distributed, multiprocessing +from dask import distributed from joblib import cpu_count from .parallel import ParallelAnalysisBase, Timing @@ -59,8 +59,8 @@ class LeafletFinder(ParallelAnalysisBase): At the moment, this class has far fewer features than the serial version :class:`MDAnalysis.analysis.leaflet.LeafletFinder`. - This version offers Leaflet Finder algorithm 4 ("Tree-based Nearest - Neighbor and Parallel-Connected Com- ponents (Tree-Search)") in + This version offers LeafletFinder algorithm 4 ("Tree-based Nearest + Neighbor and Parallel-Connected Components (Tree-Search)") in [Paraskevakos2018]_. Currently, periodic boundaries are not taken into account. @@ -254,10 +254,10 @@ def run(self, """ if scheduler is None: - scheduler = multiprocessing + scheduler = 'multiprocessing' if n_jobs == -1: - if scheduler == multiprocessing: + if scheduler == 'multiprocessing': n_jobs = cpu_count() elif isinstance(scheduler, distributed.Client): n_jobs = len(scheduler.ncores()) @@ -269,8 +269,8 @@ def run(self, with timeit() as b_universe: universe = mda.Universe(self._top, self._traj) - scheduler_kwargs = {'get': scheduler.get} - if scheduler == multiprocessing: + scheduler_kwargs = {'scheduler': scheduler} + if scheduler == 'multiprocessing': scheduler_kwargs['num_workers'] = n_jobs start, stop, step = self._trajectory.check_slice_indices( diff --git a/pmda/test/test_leaflet.py b/pmda/test/test_leaflet.py index cf16b57b..cf8aff29 100644 --- a/pmda/test/test_leaflet.py +++ b/pmda/test/test_leaflet.py @@ -7,7 +7,6 @@ import MDAnalysis from MDAnalysisTests.datafiles import Martini_membrane_gro from MDAnalysisTests.datafiles import GRO_MEMPROT, XTC_MEMPROT -from dask import multiprocessing from pmda import leaflet import numpy as np @@ -39,24 +38,29 @@ def correct_values(self): def correct_values_single_frame(self): return [np.arange(1, 2150, 12), np.arange(2521, 4670, 12)] - def test_leaflet(self, universe, correct_values): + # XFAIL for 2 jobs needs to be fixed! + @pytest.mark.parametrize('n_jobs', (1, pytest.mark.xfail(2))) + def test_leaflet(self, universe, correct_values, n_jobs): lipid_heads = universe.select_atoms("name P and resname POPG") universe.trajectory.rewind() leaflets = leaflet.LeafletFinder(universe, lipid_heads) - leaflets.run(scheduler=multiprocessing, n_jobs=1) + leaflets.run(n_jobs=n_jobs) results = [atoms.indices for atomgroup in leaflets.results for atoms in atomgroup] [assert_almost_equal(x, y, err_msg="error: leaflets should match " + "test values") for x, y in zip(results, correct_values)] + @pytest.mark.parametrize('n_jobs', (1, 2)) def test_leaflet_single_frame(self, u_one_frame, - correct_values_single_frame): + correct_values_single_frame, + n_jobs): lipid_heads = u_one_frame.select_atoms("name PO4") u_one_frame.trajectory.rewind() leaflets = leaflet.LeafletFinder(u_one_frame, - lipid_heads).run(start=0, stop=1) + lipid_heads).run(start=0, stop=1, + n_jobs=n_jobs) assert_almost_equal([atoms.indices for atomgroup in leaflets.results for atoms in atomgroup], From 19166c2a34f9e914d54fd624e91c9e4723715b19 Mon Sep 17 00:00:00 2001 From: Oliver Beckstein Date: Mon, 29 Oct 2018 01:37:57 -0700 Subject: [PATCH 05/10] tests: scheduler fixture: return 'multiprocessing' as string - passes 'multiprocessing' as the scheduler instead of multiprocessing (which does not work with dask >= 0.20 anymore) - actually passes whatever we define as parameter; only distributed is currently an exception - removed superfluous import of distributed.multiprocessing --- conftest.py | 4 ++-- pmda/test/test_parallel.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/conftest.py b/conftest.py index a2b4ee6b..8a343b4b 100644 --- a/conftest.py +++ b/conftest.py @@ -8,7 +8,7 @@ # # Released under the GNU Public Licence, v2 or any higher version -from dask import distributed, multiprocessing +from dask import distributed import pytest @@ -29,4 +29,4 @@ def scheduler(request, client): if request.param == 'distributed': return client else: - return multiprocessing + return request.param diff --git a/pmda/test/test_parallel.py b/pmda/test/test_parallel.py index 5ce82fe2..57dc74d7 100644 --- a/pmda/test/test_parallel.py +++ b/pmda/test/test_parallel.py @@ -102,15 +102,15 @@ def scheduler(request, client): def test_scheduler(analysis, scheduler): analysis.run(scheduler=scheduler) - + def test_nframes_less_nblocks_warning(analysis): u = mda.Universe(analysis._top, analysis._traj) n_frames = u.trajectory.n_frames with pytest.warns(UserWarning): analysis.run(stop=2, n_blocks=4, n_jobs=2) assert len(analysis.res) == 2 - - + + @pytest.mark.parametrize('n_blocks', np.arange(1, 11)) def test_nblocks(analysis, n_blocks): analysis.run(n_blocks=n_blocks) From 8686767b8f0ec2f135d3d6d93a88695c78f2723b Mon Sep 17 00:00:00 2001 From: Oliver Beckstein Date: Mon, 29 Oct 2018 18:10:36 -0700 Subject: [PATCH 06/10] test leaflet with n_jobs=-1 (but XFAIL it) This test is needed to get coverage of leaflet back up but: TEST or CODE needs to be fixed. --- pmda/test/test_leaflet.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pmda/test/test_leaflet.py b/pmda/test/test_leaflet.py index cf8aff29..4e8ac099 100644 --- a/pmda/test/test_leaflet.py +++ b/pmda/test/test_leaflet.py @@ -39,7 +39,9 @@ def correct_values_single_frame(self): return [np.arange(1, 2150, 12), np.arange(2521, 4670, 12)] # XFAIL for 2 jobs needs to be fixed! - @pytest.mark.parametrize('n_jobs', (1, pytest.mark.xfail(2))) + @pytest.mark.parametrize('n_jobs', (pytest.mark.xfail(-1), + 1, + pytest.mark.xfail(2))) def test_leaflet(self, universe, correct_values, n_jobs): lipid_heads = universe.select_atoms("name P and resname POPG") universe.trajectory.rewind() @@ -51,7 +53,7 @@ def test_leaflet(self, universe, correct_values, n_jobs): "test values") for x, y in zip(results, correct_values)] - @pytest.mark.parametrize('n_jobs', (1, 2)) + @pytest.mark.parametrize('n_jobs', (-1, 1, 2)) def test_leaflet_single_frame(self, u_one_frame, correct_values_single_frame, From d71eb98f2195db56cb37b0c96554ec1b8dc40280 Mon Sep 17 00:00:00 2001 From: Max Linke Date: Tue, 30 Oct 2018 21:55:25 +0100 Subject: [PATCH 07/10] enable new dask 0.18 scheduler selection idoms - someone should check with dask. It seems a bit brittle - fix tests maybe - update documentation fixes #17 --- conftest.py | 9 ++++--- docs/userguide/parallelization.rst | 6 ++--- docs/userguide/pmda_classes.rst | 2 +- pmda/leaflet.py | 43 +++++++++++++++++++++--------- pmda/parallel.py | 29 ++++++++++++++------ pmda/test/test_custom.py | 6 ++--- pmda/test/test_parallel.py | 15 +---------- 7 files changed, 65 insertions(+), 45 deletions(-) diff --git a/conftest.py b/conftest.py index 8a343b4b..d5298904 100644 --- a/conftest.py +++ b/conftest.py @@ -9,6 +9,7 @@ # Released under the GNU Public Licence, v2 or any higher version from dask import distributed +import dask import pytest @@ -24,9 +25,11 @@ def client(tmpdir_factory, request): lc.close() -@pytest.fixture(scope='session', params=('distributed', 'multiprocessing')) +@pytest.fixture(scope='session', params=('distributed', 'multiprocessing', 'single-threaded')) def scheduler(request, client): if request.param == 'distributed': - return client + arg = client else: - return request.param + arg = request.param + with dask.config.set(scheduler=arg): + yield diff --git a/docs/userguide/parallelization.rst b/docs/userguide/parallelization.rst index 73f39a67..e76f2ac8 100644 --- a/docs/userguide/parallelization.rst +++ b/docs/userguide/parallelization.rst @@ -19,7 +19,7 @@ Internally, this uses the multiprocessing `scheduler`_ of dask. If you want to make use of more advanced scheduler features or scale your analysis to multiple nodes, e.g., in an HPC (high performance computing) environment, then use the :mod:`distributed` scheduler, as -described next. +described next. If ``n_jobs==1`` use a single threaded scheduler. .. _`scheduler`: https://dask.pydata.org/en/latest/scheduler-overview.html @@ -58,7 +58,7 @@ use the :ref:`RMSD example`): .. code:: python - rmsd_ana = rms.RMSD(u.atoms, ref.atoms).run(scheduler=client) + rmsd_ana = rms.RMSD(u.atoms, ref.atoms).run() Because the local cluster contains 8 workers, the RMSD trajectory analysis will be parallelized over 8 trajectory segments. @@ -78,7 +78,7 @@ analysis :meth:`~pmda.parallel.ParallelAnalysisBase.run` method: import distributed client = distributed.Client('192.168.0.1:8786') - rmsd_ana = rms.RMSD(u.atoms, ref.atoms).run(scheduler=client) + rmsd_ana = rms.RMSD(u.atoms, ref.atoms).run() In this way one can spread an analysis task over many different nodes. diff --git a/docs/userguide/pmda_classes.rst b/docs/userguide/pmda_classes.rst index 38681911..13ea3e67 100644 --- a/docs/userguide/pmda_classes.rst +++ b/docs/userguide/pmda_classes.rst @@ -17,7 +17,7 @@ are provided as keyword arguments: set up the parallel analysis - .. method:: run(n_jobs=-1, scheduler=None) + .. method:: run(n_jobs=-1) perform parallel analysis; see :ref:`parallelization` for explanation of the arguments diff --git a/pmda/leaflet.py b/pmda/leaflet.py index bc9e94b9..a6ad8e2d 100644 --- a/pmda/leaflet.py +++ b/pmda/leaflet.py @@ -231,7 +231,6 @@ def run(self, start=None, stop=None, step=None, - scheduler=None, n_jobs=-1, cutoff=15.0): """Perform the calculation @@ -244,35 +243,53 @@ def run(self, stop frame of analysis step : int, optional number of frames to skip between each analysed frame - scheduler : dask scheduler, optional - Use dask scheduler, defaults to multiprocessing. This can be used - to spread work to a distributed scheduler n_jobs : int, optional number of tasks to start, if `-1` use number of logical cpu cores. This argument will be ignored when the distributed scheduler is used """ - if scheduler is None: + # are we using a distributed scheduler or should we use multiprocessing? + scheduler = dask.config.get('scheduler', None) + if scheduler is None and client is None: scheduler = 'multiprocessing' + elif scheduler is None: + # maybe we can grab a global worker + try: + from dask import distributed + scheduler = distributed.worker.get_client() + except ValueError: + pass + except ImportError: + pass if n_jobs == -1: + n_jobs = cpu_count() + + # we could not find a global scheduler to use and we ask for a single + # job. Therefore we run this on the single threaded scheduler for + # debugging. + if scheduler is None and n_jobs == 1: + scheduler = 'single-threaded' + + if n_blocks is None: if scheduler == 'multiprocessing': - n_jobs = cpu_count() + n_blocks = n_jobs elif isinstance(scheduler, distributed.Client): - n_jobs = len(scheduler.ncores()) + n_blocks = len(scheduler.ncores()) else: - raise ValueError( - "Couldn't guess ideal number of jobs from scheduler." - "Please provide `n_jobs` in call to method.") - - with timeit() as b_universe: - universe = mda.Universe(self._top, self._traj) + n_blocks = 1 + warnings.warn( + "Couldn't guess ideal number of blocks from scheduler. Set n_blocks=1" + "Please provide `n_blocks` in call to method.") scheduler_kwargs = {'scheduler': scheduler} if scheduler == 'multiprocessing': scheduler_kwargs['num_workers'] = n_jobs + with timeit() as b_universe: + universe = mda.Universe(self._top, self._traj) + start, stop, step = self._trajectory.check_slice_indices( start, stop, step) with timeit() as total: diff --git a/pmda/parallel.py b/pmda/parallel.py index 0c5d9fa0..8917e5f6 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -21,7 +21,6 @@ from six.moves import range import MDAnalysis as mda -from dask import distributed from dask.delayed import delayed from joblib import cpu_count import numpy as np @@ -267,7 +266,6 @@ def run(self, start=None, stop=None, step=None, - scheduler=None, n_jobs=1, n_blocks=None): """Perform the calculation @@ -280,9 +278,6 @@ def run(self, stop frame of analysis step : int, optional number of frames to skip between each analysed frame - scheduler : dask scheduler, optional - Use dask scheduler, defaults to multiprocessing. This can be used - to spread work to a distributed scheduler n_jobs : int, optional number of jobs to start, if `-1` use number of logical cpu cores. This argument will be ignored when the distributed scheduler is @@ -292,20 +287,38 @@ def run(self, to n_jobs or number of available workers in scheduler. """ - if scheduler is None: + # are we using a distributed scheduler or should we use multiprocessing? + scheduler = dask.config.get('scheduler', None) + if scheduler is None and client is None: scheduler = 'multiprocessing' + elif scheduler is None: + # maybe we can grab a global worker + try: + from dask import distributed + scheduler = distributed.worker.get_client() + except ValueError: + pass + except ImportError: + pass if n_jobs == -1: n_jobs = cpu_count() + # we could not find a global scheduler to use and we ask for a single + # job. Therefore we run this on the single threaded scheduler for + # debugging. + if scheduler is None and n_jobs == 1: + scheduler = 'single-threaded' + if n_blocks is None: if scheduler == 'multiprocessing': n_blocks = n_jobs elif isinstance(scheduler, distributed.Client): n_blocks = len(scheduler.ncores()) else: - raise ValueError( - "Couldn't guess ideal number of blocks from scheduler." + n_blocks = 1 + warnings.warn( + "Couldn't guess ideal number of blocks from scheduler. Set n_blocks=1" "Please provide `n_blocks` in call to method.") scheduler_kwargs = {'scheduler': scheduler} diff --git a/pmda/test/test_custom.py b/pmda/test/test_custom.py index e29dcb97..96d5169f 100644 --- a/pmda/test/test_custom.py +++ b/pmda/test/test_custom.py @@ -27,13 +27,13 @@ def test_AnalysisFromFunction(scheduler): u = mda.Universe(PSF, DCD) step = 2 ana1 = custom.AnalysisFromFunction(custom_function, u, u.atoms).run( - step=step, scheduler=scheduler + step=step ) ana2 = custom.AnalysisFromFunction(custom_function, u, u.atoms).run( - step=step, scheduler=scheduler + step=step ) ana3 = custom.AnalysisFromFunction(custom_function, u, u.atoms).run( - step=step, scheduler=scheduler + step=step ) results = [] diff --git a/pmda/test/test_parallel.py b/pmda/test/test_parallel.py index 57dc74d7..e25aa46a 100644 --- a/pmda/test/test_parallel.py +++ b/pmda/test/test_parallel.py @@ -60,11 +60,6 @@ def analysis(): return ana -def test_wrong_scheduler(analysis): - with pytest.raises(ValueError): - analysis.run(scheduler=2) - - @pytest.mark.parametrize('n_jobs', (1, 2)) def test_all_frames(analysis, n_jobs): analysis.run(n_jobs=n_jobs) @@ -91,16 +86,8 @@ def test_no_frames(analysis, n_jobs): assert analysis.timing.universe == 0 -@pytest.fixture(scope='session', params=('distributed', 'multiprocessing')) -def scheduler(request, client): - if request.param == 'distributed': - return client - else: - return request.param - - def test_scheduler(analysis, scheduler): - analysis.run(scheduler=scheduler) + analysis.run() def test_nframes_less_nblocks_warning(analysis): From 6f545467db89f604e8d968bdcedb2a2c397f222c Mon Sep 17 00:00:00 2001 From: Max Linke Date: Tue, 30 Oct 2018 22:29:08 +0100 Subject: [PATCH 08/10] fix missing import --- pmda/leaflet.py | 2 +- pmda/parallel.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/pmda/leaflet.py b/pmda/leaflet.py index a6ad8e2d..9e28d4ad 100644 --- a/pmda/leaflet.py +++ b/pmda/leaflet.py @@ -27,7 +27,7 @@ from scipy.spatial import cKDTree import MDAnalysis as mda -from dask import distributed +import dask from joblib import cpu_count from .parallel import ParallelAnalysisBase, Timing diff --git a/pmda/parallel.py b/pmda/parallel.py index 8917e5f6..357bad61 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -22,6 +22,7 @@ import MDAnalysis as mda from dask.delayed import delayed +import dask from joblib import cpu_count import numpy as np From 51c1b5d40fff5092dd4945b9a3b75d928d4a6a67 Mon Sep 17 00:00:00 2001 From: Oliver Beckstein Date: Tue, 30 Oct 2018 17:48:49 -0700 Subject: [PATCH 09/10] respect new dask.config global selection of scheduler - fix #48 - updated boiler-plate code in ParallelAnalysisBase.run and copied and pasted into leaflet.LeafletFinder.run() (TODO: makes this more DRY) - dask.distributed added as dependency (it is recommended by dask for a single node anyway, and it avoids imports inside if statements... much cleaner code in PMDA) - removed scheduler kwarg: use dask.config.set(scheduler=...) - 'multiprocessing' and n_jobs=-1 are now only selected if nothing is set by dask; if one wants n_jobs=-1 to always grab all cores then you must set the multiprocessing scheduler - default for n_jobs=1 (instead of -1), i.e., the single threaded scheduler - updated tests - removed unnecessary broken(?) test for "no deprecations" in parallel.ParallelAnalysisBase - updated CHANGELOG --- .travis.yml | 2 +- CHANGELOG | 11 ++++++++++- conftest.py | 4 +++- pmda/leaflet.py | 25 +++++++------------------ pmda/parallel.py | 22 ++++++++++++---------- pmda/test/test_custom.py | 16 ---------------- pmda/test/test_parallel.py | 5 +++-- setup.py | 3 ++- 8 files changed, 38 insertions(+), 50 deletions(-) diff --git a/.travis.yml b/.travis.yml index 93807f41..76690594 100644 --- a/.travis.yml +++ b/.travis.yml @@ -20,7 +20,7 @@ env: # minimal CONDA_MDANALYSIS_DEPENDENCIES #- CONDA_DEPENDENCIES="mdanalysis mdanalysistests dask joblib pytest-pep8 mock codecov cython hypothesis sphinx" - CONDA_MDANALYSIS_DEPENDENCIES="cython mmtf-python six biopython networkx scipy griddataformats gsd hypothesis" - - CONDA_DEPENDENCIES="${CONDA_MDANALYSIS_DEPENDENCIES} dask joblib pytest-pep8 mock codecov" + - CONDA_DEPENDENCIES="${CONDA_MDANALYSIS_DEPENDENCIES} dask distributed joblib pytest-pep8 mock codecov" - CONDA_CHANNELS='conda-forge' - CONDA_CHANNEL_PRIORITY=True # install development version of MDAnalysis (needed until the test diff --git a/CHANGELOG b/CHANGELOG index a7c18fd2..f473d050 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -13,7 +13,7 @@ The rules for this file: * release numbers follow "Semantic Versioning" http://semver.org ------------------------------------------------------------------------------ -xx/xx/18 VOD555, richardjgowers, iparask, orbeckst +11/xx/18 VOD555, richardjgowers, iparask, orbeckst, kain88-de * 0.2.0 @@ -27,6 +27,15 @@ Fixes * always distribute frames over blocks so that no empty blocks are created ("balanced blocks", Issue #71) +Changes + * requires dask >= 0.18.0 and respects/requires globally setting of the dask + scheduler (Issue #48) + * removed the 'scheduler' keyword from the run() method; use + dask.config.set(scheduler=...) as recommended in the dask docs + * uses single-threaaded scheduler if n_jobs=1 (Issue #17) + * n_jobs=1 is now the default for run() (used to be n_jobs=-1) + * dask.distributed is now a dependency + 06/07/18 orbeckst diff --git a/conftest.py b/conftest.py index d5298904..54e3f63f 100644 --- a/conftest.py +++ b/conftest.py @@ -25,7 +25,9 @@ def client(tmpdir_factory, request): lc.close() -@pytest.fixture(scope='session', params=('distributed', 'multiprocessing', 'single-threaded')) +@pytest.fixture(scope='session', params=('distributed', + 'multiprocessing', + 'single-threaded')) def scheduler(request, client): if request.param == 'distributed': arg = client diff --git a/pmda/leaflet.py b/pmda/leaflet.py index 9e28d4ad..ec648b5e 100644 --- a/pmda/leaflet.py +++ b/pmda/leaflet.py @@ -249,19 +249,15 @@ def run(self, used """ - # are we using a distributed scheduler or should we use multiprocessing? + # are we using a distributed scheduler or should we use + # multiprocessing? scheduler = dask.config.get('scheduler', None) - if scheduler is None and client is None: - scheduler = 'multiprocessing' - elif scheduler is None: + if scheduler is None: # maybe we can grab a global worker try: - from dask import distributed - scheduler = distributed.worker.get_client() + scheduler = dask.distributed.worker.get_client() except ValueError: pass - except ImportError: - pass if n_jobs == -1: n_jobs = cpu_count() @@ -272,16 +268,9 @@ def run(self, if scheduler is None and n_jobs == 1: scheduler = 'single-threaded' - if n_blocks is None: - if scheduler == 'multiprocessing': - n_blocks = n_jobs - elif isinstance(scheduler, distributed.Client): - n_blocks = len(scheduler.ncores()) - else: - n_blocks = 1 - warnings.warn( - "Couldn't guess ideal number of blocks from scheduler. Set n_blocks=1" - "Please provide `n_blocks` in call to method.") + # fall back to multiprocessing, we tried everything + if scheduler is None: + scheduler = 'multiprocessing' scheduler_kwargs = {'scheduler': scheduler} if scheduler == 'multiprocessing': diff --git a/pmda/parallel.py b/pmda/parallel.py index 357bad61..f595740c 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -23,6 +23,7 @@ import MDAnalysis as mda from dask.delayed import delayed import dask +import dask.distributed from joblib import cpu_count import numpy as np @@ -288,19 +289,15 @@ def run(self, to n_jobs or number of available workers in scheduler. """ - # are we using a distributed scheduler or should we use multiprocessing? + # are we using a distributed scheduler or should we use + # multiprocessing? scheduler = dask.config.get('scheduler', None) - if scheduler is None and client is None: - scheduler = 'multiprocessing' - elif scheduler is None: + if scheduler is None: # maybe we can grab a global worker try: - from dask import distributed - scheduler = distributed.worker.get_client() + scheduler = dask.distributed.worker.get_client() except ValueError: pass - except ImportError: - pass if n_jobs == -1: n_jobs = cpu_count() @@ -311,15 +308,20 @@ def run(self, if scheduler is None and n_jobs == 1: scheduler = 'single-threaded' + # fall back to multiprocessing, we tried everything + if scheduler is None: + scheduler = 'multiprocessing' + if n_blocks is None: if scheduler == 'multiprocessing': n_blocks = n_jobs - elif isinstance(scheduler, distributed.Client): + elif isinstance(scheduler, dask.distributed.Client): n_blocks = len(scheduler.ncores()) else: n_blocks = 1 warnings.warn( - "Couldn't guess ideal number of blocks from scheduler. Set n_blocks=1" + "Couldn't guess ideal number of blocks from scheduler. " + "Setting n_blocks=1. " "Please provide `n_blocks` in call to method.") scheduler_kwargs = {'scheduler': scheduler} diff --git a/pmda/test/test_custom.py b/pmda/test/test_custom.py index 96d5169f..3f8c9767 100644 --- a/pmda/test/test_custom.py +++ b/pmda/test/test_custom.py @@ -12,7 +12,6 @@ import numpy as np import MDAnalysis as mda from MDAnalysisTests.datafiles import PSF, DCD -from MDAnalysisTests.util import no_deprecated_call import pytest from numpy.testing import assert_equal @@ -81,18 +80,3 @@ def test_analysis_class(): assert_equal(results, ana.results) with pytest.raises(ValueError): ana_class(2) - - -def test_analysis_class_decorator(): - # Issue #1511 - # analysis_class should not raise - # a DeprecationWarning - u = mda.Universe(PSF, DCD) - - def distance(a, b): - return np.linalg.norm((a.centroid() - b.centroid())) - - Distances = custom.analysis_class(distance) - - with no_deprecated_call(): - Distances(u, u.atoms[:10], u.atoms[10:20]).run() diff --git a/pmda/test/test_parallel.py b/pmda/test/test_parallel.py index e25aa46a..0e966eaa 100644 --- a/pmda/test/test_parallel.py +++ b/pmda/test/test_parallel.py @@ -14,7 +14,7 @@ from MDAnalysisTests.datafiles import DCD, PSF import joblib -from dask import distributed +import dask from pmda import parallel @@ -105,7 +105,8 @@ def test_nblocks(analysis, n_blocks): def test_guess_nblocks(analysis): - analysis.run(n_jobs=-1) + with dask.config.set(scheduler='multiprocessing'): + analysis.run(n_jobs=-1) assert len(analysis._results) == joblib.cpu_count() diff --git a/setup.py b/setup.py index c11a218a..4ffe4d72 100644 --- a/setup.py +++ b/setup.py @@ -50,10 +50,11 @@ install_requires=[ 'MDAnalysis>=0.18', 'dask>=0.18', + 'distributed', 'six', 'joblib', # cpu_count func currently 'networkx', - 'scipy', + 'scipy', ], tests_require=[ 'pytest', From dd888675b16c66319535a7b3ebb07f99e8d73884 Mon Sep 17 00:00:00 2001 From: Oliver Beckstein Date: Tue, 30 Oct 2018 18:11:27 -0700 Subject: [PATCH 10/10] bumped MDAnalysis requirement to 0.19.0 - install conda package of MDA on travis - require MDA and MDATests >= 0.19.0 --- .travis.yml | 5 +++-- setup.py | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/.travis.yml b/.travis.yml index 76690594..7c292b18 100644 --- a/.travis.yml +++ b/.travis.yml @@ -19,13 +19,14 @@ env: # mdanalysis develop from source (see below), which needs # minimal CONDA_MDANALYSIS_DEPENDENCIES #- CONDA_DEPENDENCIES="mdanalysis mdanalysistests dask joblib pytest-pep8 mock codecov cython hypothesis sphinx" - - CONDA_MDANALYSIS_DEPENDENCIES="cython mmtf-python six biopython networkx scipy griddataformats gsd hypothesis" + #- CONDA_MDANALYSIS_DEPENDENCIES="cython mmtf-python six biopython networkx scipy griddataformats gsd hypothesis" + - CONDA_MDANALYSIS_DEPENDENCIES="mdanalysis mdanalysistests" - CONDA_DEPENDENCIES="${CONDA_MDANALYSIS_DEPENDENCIES} dask distributed joblib pytest-pep8 mock codecov" - CONDA_CHANNELS='conda-forge' - CONDA_CHANNEL_PRIORITY=True # install development version of MDAnalysis (needed until the test # files for analysis.rdf are available in release 0.19.0) - - PIP_DEPENDENCIES="git+https://github.com/MDAnalysis/mdanalysis#egg=mdanalysis&subdirectory=package git+https://github.com/MDAnalysis/mdanalysis#egg=mdanalysistests&subdirectory=testsuite" + #- PIP_DEPENDENCIES="git+https://github.com/MDAnalysis/mdanalysis#egg=mdanalysis&subdirectory=package git+https://github.com/MDAnalysis/mdanalysis#egg=mdanalysistests&subdirectory=testsuite" - NUMPY_VERSION=stable - BUILD_CMD='python setup.py develop' - CODECOV=false diff --git a/setup.py b/setup.py index 4ffe4d72..efb15188 100644 --- a/setup.py +++ b/setup.py @@ -48,7 +48,7 @@ }, packages=find_packages(), install_requires=[ - 'MDAnalysis>=0.18', + 'MDAnalysis>=0.19.0', 'dask>=0.18', 'distributed', 'six', @@ -58,5 +58,5 @@ ], tests_require=[ 'pytest', - 'MDAnalysisTests>=0.18', # keep + 'MDAnalysisTests>=0.19.0', # keep ], )