Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update to dask 0.18.0 #66

Merged
merged 11 commits into from
Nov 1, 2018
7 changes: 4 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ env:
# mdanalysis develop from source (see below), which needs
# minimal CONDA_MDANALYSIS_DEPENDENCIES
#- CONDA_DEPENDENCIES="mdanalysis mdanalysistests dask joblib pytest-pep8 mock codecov cython hypothesis sphinx"
- CONDA_MDANALYSIS_DEPENDENCIES="cython mmtf-python six biopython networkx scipy griddataformats gsd hypothesis"
- CONDA_DEPENDENCIES="${CONDA_MDANALYSIS_DEPENDENCIES} dask joblib pytest-pep8 mock codecov"
#- CONDA_MDANALYSIS_DEPENDENCIES="cython mmtf-python six biopython networkx scipy griddataformats gsd hypothesis"
- CONDA_MDANALYSIS_DEPENDENCIES="mdanalysis mdanalysistests"
- CONDA_DEPENDENCIES="${CONDA_MDANALYSIS_DEPENDENCIES} dask distributed joblib pytest-pep8 mock codecov"
- CONDA_CHANNELS='conda-forge'
- CONDA_CHANNEL_PRIORITY=True
# install development version of MDAnalysis (needed until the test
# files for analysis.rdf are available in release 0.19.0)
- PIP_DEPENDENCIES="git+https://github.com/MDAnalysis/mdanalysis#egg=mdanalysis&subdirectory=package git+https://github.com/MDAnalysis/mdanalysis#egg=mdanalysistests&subdirectory=testsuite"
#- PIP_DEPENDENCIES="git+https://github.com/MDAnalysis/mdanalysis#egg=mdanalysis&subdirectory=package git+https://github.com/MDAnalysis/mdanalysis#egg=mdanalysistests&subdirectory=testsuite"
- NUMPY_VERSION=stable
- BUILD_CMD='python setup.py develop'
- CODECOV=false
Expand Down
11 changes: 10 additions & 1 deletion CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ The rules for this file:
* release numbers follow "Semantic Versioning" http://semver.org

------------------------------------------------------------------------------
xx/xx/18 VOD555, richardjgowers, iparask, orbeckst
11/xx/18 VOD555, richardjgowers, iparask, orbeckst, kain88-de

* 0.2.0

Expand All @@ -27,6 +27,15 @@ Fixes
* always distribute frames over blocks so that no empty blocks are
created ("balanced blocks", Issue #71)

Changes
* requires dask >= 0.18.0 and respects/requires globally setting of the dask
scheduler (Issue #48)
* removed the 'scheduler' keyword from the run() method; use
dask.config.set(scheduler=...) as recommended in the dask docs
* uses single-threaaded scheduler if n_jobs=1 (Issue #17)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo, needs fixing

* n_jobs=1 is now the default for run() (used to be n_jobs=-1)
* dask.distributed is now a dependency
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setup.py has it as full dep; it could be moved into test dependencies if you really want to keep it optional. If you make it fully optional, please remove this line from CHANGELOG



06/07/18 orbeckst

Expand Down
13 changes: 9 additions & 4 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
#
# Released under the GNU Public Licence, v2 or any higher version

from dask import distributed, multiprocessing
from dask import distributed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests require distributed

import dask
import pytest


Expand All @@ -24,9 +25,13 @@ def client(tmpdir_factory, request):
lc.close()


@pytest.fixture(scope='session', params=('distributed', 'multiprocessing'))
@pytest.fixture(scope='session', params=('distributed',
'multiprocessing',
'single-threaded'))
def scheduler(request, client):
if request.param == 'distributed':
return client
arg = client
else:
return multiprocessing
arg = request.param
with dask.config.set(scheduler=arg):
yield
6 changes: 3 additions & 3 deletions docs/userguide/parallelization.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Internally, this uses the multiprocessing `scheduler`_ of dask. If you
want to make use of more advanced scheduler features or scale your
analysis to multiple nodes, e.g., in an HPC (high performance
computing) environment, then use the :mod:`distributed` scheduler, as
described next.
described next. If ``n_jobs==1`` use a single threaded scheduler.

.. _`scheduler`:
https://dask.pydata.org/en/latest/scheduler-overview.html
Expand Down Expand Up @@ -58,7 +58,7 @@ use the :ref:`RMSD example<example-parallel-rmsd>`):

.. code:: python

rmsd_ana = rms.RMSD(u.atoms, ref.atoms).run(scheduler=client)
rmsd_ana = rms.RMSD(u.atoms, ref.atoms).run()

Because the local cluster contains 8 workers, the RMSD trajectory
analysis will be parallelized over 8 trajectory segments.
Expand All @@ -78,7 +78,7 @@ analysis :meth:`~pmda.parallel.ParallelAnalysisBase.run` method:

import distributed
client = distributed.Client('192.168.0.1:8786')
rmsd_ana = rms.RMSD(u.atoms, ref.atoms).run(scheduler=client)
rmsd_ana = rms.RMSD(u.atoms, ref.atoms).run()

In this way one can spread an analysis task over many different nodes.

Expand Down
2 changes: 1 addition & 1 deletion docs/userguide/pmda_classes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ are provided as keyword arguments:

set up the parallel analysis

.. method:: run(n_jobs=-1, scheduler=None)
.. method:: run(n_jobs=-1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the default is now n_jobs=1, isn't it?


perform parallel analysis; see :ref:`parallelization`
for explanation of the arguments
Expand Down
50 changes: 28 additions & 22 deletions pmda/leaflet.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
#
# Released under the GNU Public Licence, v2 or any higher version
"""
LeafletFInder Analysis tool --- :mod:`pmda.leaflet`
==========================================================
LeafletFinder Analysis tool --- :mod:`pmda.leaflet`
===================================================

This module contains parallel versions of analysis tasks in
:mod:`MDAnalysis.analysis.leaflet`.
Expand All @@ -27,7 +27,7 @@
from scipy.spatial import cKDTree

import MDAnalysis as mda
from dask import distributed, multiprocessing
import dask
from joblib import cpu_count

from .parallel import ParallelAnalysisBase, Timing
Expand Down Expand Up @@ -59,8 +59,8 @@ class LeafletFinder(ParallelAnalysisBase):
At the moment, this class has far fewer features than the serial
version :class:`MDAnalysis.analysis.leaflet.LeafletFinder`.

This version offers Leaflet Finder algorithm 4 ("Tree-based Nearest
Neighbor and Parallel-Connected Com- ponents (Tree-Search)") in
This version offers LeafletFinder algorithm 4 ("Tree-based Nearest
Neighbor and Parallel-Connected Components (Tree-Search)") in
[Paraskevakos2018]_.

Currently, periodic boundaries are not taken into account.
Expand Down Expand Up @@ -231,7 +231,6 @@ def run(self,
start=None,
stop=None,
step=None,
scheduler=None,
n_jobs=-1,
cutoff=15.0):
"""Perform the calculation
Expand All @@ -244,35 +243,42 @@ def run(self,
stop frame of analysis
step : int, optional
number of frames to skip between each analysed frame
scheduler : dask scheduler, optional
Use dask scheduler, defaults to multiprocessing. This can be used
to spread work to a distributed scheduler
n_jobs : int, optional
number of tasks to start, if `-1` use number of logical cpu cores.
This argument will be ignored when the distributed scheduler is
used

"""
# are we using a distributed scheduler or should we use
# multiprocessing?
scheduler = dask.config.get('scheduler', None)
if scheduler is None:
scheduler = multiprocessing
# maybe we can grab a global worker
try:
scheduler = dask.distributed.worker.get_client()
except ValueError:
pass

if n_jobs == -1:
if scheduler == multiprocessing:
n_jobs = cpu_count()
elif isinstance(scheduler, distributed.Client):
n_jobs = len(scheduler.ncores())
else:
raise ValueError(
"Couldn't guess ideal number of jobs from scheduler."
"Please provide `n_jobs` in call to method.")
n_jobs = cpu_count()

with timeit() as b_universe:
universe = mda.Universe(self._top, self._traj)
# we could not find a global scheduler to use and we ask for a single
# job. Therefore we run this on the single threaded scheduler for
# debugging.
if scheduler is None and n_jobs == 1:
scheduler = 'single-threaded'

# fall back to multiprocessing, we tried everything
if scheduler is None:
scheduler = 'multiprocessing'

scheduler_kwargs = {'get': scheduler.get}
if scheduler == multiprocessing:
scheduler_kwargs = {'scheduler': scheduler}
if scheduler == 'multiprocessing':
scheduler_kwargs['num_workers'] = n_jobs

with timeit() as b_universe:
universe = mda.Universe(self._top, self._traj)

start, stop, step = self._trajectory.check_slice_indices(
start, stop, step)
with timeit() as total:
Expand Down
40 changes: 28 additions & 12 deletions pmda/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@
from six.moves import range

import MDAnalysis as mda
from dask import distributed, multiprocessing
from dask.delayed import delayed
import dask
import dask.distributed
from joblib import cpu_count
import numpy as np

Expand Down Expand Up @@ -267,7 +268,6 @@ def run(self,
start=None,
stop=None,
step=None,
scheduler=None,
n_jobs=1,
n_blocks=None):
"""Perform the calculation
Expand All @@ -280,9 +280,6 @@ def run(self,
stop frame of analysis
step : int, optional
number of frames to skip between each analysed frame
scheduler : dask scheduler, optional
Use dask scheduler, defaults to multiprocessing. This can be used
to spread work to a distributed scheduler
n_jobs : int, optional
number of jobs to start, if `-1` use number of logical cpu cores.
This argument will be ignored when the distributed scheduler is
Expand All @@ -292,24 +289,43 @@ def run(self,
to n_jobs or number of available workers in scheduler.

"""
# are we using a distributed scheduler or should we use
# multiprocessing?
scheduler = dask.config.get('scheduler', None)
if scheduler is None:
scheduler = multiprocessing
# maybe we can grab a global worker
try:
scheduler = dask.distributed.worker.get_client()
except ValueError:
pass

if n_jobs == -1:
n_jobs = cpu_count()

# we could not find a global scheduler to use and we ask for a single
# job. Therefore we run this on the single threaded scheduler for
# debugging.
if scheduler is None and n_jobs == 1:
scheduler = 'single-threaded'
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixes #17

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice


# fall back to multiprocessing, we tried everything
if scheduler is None:
scheduler = 'multiprocessing'

if n_blocks is None:
if scheduler == multiprocessing:
if scheduler == 'multiprocessing':
n_blocks = n_jobs
elif isinstance(scheduler, distributed.Client):
elif isinstance(scheduler, dask.distributed.Client):
n_blocks = len(scheduler.ncores())
else:
raise ValueError(
"Couldn't guess ideal number of blocks from scheduler."
n_blocks = 1
warnings.warn(
"Couldn't guess ideal number of blocks from scheduler. "
"Setting n_blocks=1. "
"Please provide `n_blocks` in call to method.")

scheduler_kwargs = {'get': scheduler.get}
if scheduler == multiprocessing:
scheduler_kwargs = {'scheduler': scheduler}
if scheduler == 'multiprocessing':
scheduler_kwargs['num_workers'] = n_jobs

start, stop, step = self._trajectory.check_slice_indices(
Expand Down
22 changes: 3 additions & 19 deletions pmda/test/test_custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import numpy as np
import MDAnalysis as mda
from MDAnalysisTests.datafiles import PSF, DCD
from MDAnalysisTests.util import no_deprecated_call
import pytest
from numpy.testing import assert_equal

Expand All @@ -27,13 +26,13 @@ def test_AnalysisFromFunction(scheduler):
u = mda.Universe(PSF, DCD)
step = 2
ana1 = custom.AnalysisFromFunction(custom_function, u, u.atoms).run(
step=step, scheduler=scheduler
step=step
)
ana2 = custom.AnalysisFromFunction(custom_function, u, u.atoms).run(
step=step, scheduler=scheduler
step=step
)
ana3 = custom.AnalysisFromFunction(custom_function, u, u.atoms).run(
step=step, scheduler=scheduler
step=step
)

results = []
Expand Down Expand Up @@ -81,18 +80,3 @@ def test_analysis_class():
assert_equal(results, ana.results)
with pytest.raises(ValueError):
ana_class(2)


def test_analysis_class_decorator():
# Issue #1511
# analysis_class should not raise
# a DeprecationWarning
u = mda.Universe(PSF, DCD)

def distance(a, b):
return np.linalg.norm((a.centroid() - b.centroid()))

Distances = custom.analysis_class(distance)

with no_deprecated_call():
Distances(u, u.atoms[:10], u.atoms[10:20]).run()
16 changes: 11 additions & 5 deletions pmda/test/test_leaflet.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import MDAnalysis
from MDAnalysisTests.datafiles import Martini_membrane_gro
from MDAnalysisTests.datafiles import GRO_MEMPROT, XTC_MEMPROT
from dask import multiprocessing
from pmda import leaflet
import numpy as np

Expand Down Expand Up @@ -39,24 +38,31 @@ def correct_values(self):
def correct_values_single_frame(self):
return [np.arange(1, 2150, 12), np.arange(2521, 4670, 12)]

def test_leaflet(self, universe, correct_values):
# XFAIL for 2 jobs needs to be fixed!
@pytest.mark.parametrize('n_jobs', (pytest.mark.xfail(-1),
1,
pytest.mark.xfail(2)))
def test_leaflet(self, universe, correct_values, n_jobs):
lipid_heads = universe.select_atoms("name P and resname POPG")
universe.trajectory.rewind()
leaflets = leaflet.LeafletFinder(universe, lipid_heads)
leaflets.run(scheduler=multiprocessing, n_jobs=1)
leaflets.run(n_jobs=n_jobs)
results = [atoms.indices for atomgroup in leaflets.results
for atoms in atomgroup]
[assert_almost_equal(x, y, err_msg="error: leaflets should match " +
"test values") for x, y in
zip(results, correct_values)]

@pytest.mark.parametrize('n_jobs', (-1, 1, 2))
def test_leaflet_single_frame(self,
u_one_frame,
correct_values_single_frame):
correct_values_single_frame,
n_jobs):
lipid_heads = u_one_frame.select_atoms("name PO4")
u_one_frame.trajectory.rewind()
leaflets = leaflet.LeafletFinder(u_one_frame,
lipid_heads).run(start=0, stop=1)
lipid_heads).run(start=0, stop=1,
n_jobs=n_jobs)

assert_almost_equal([atoms.indices for atomgroup in leaflets.results
for atoms in atomgroup],
Expand Down
Loading