-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
update to dask 0.18.0 #66
Changes from all commits
ab791ac
f6980f7
56dd56a
cc11410
5103271
19166c2
8686767
d71eb98
6f54546
51c1b5d
dd88867
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,7 +13,7 @@ The rules for this file: | |
* release numbers follow "Semantic Versioning" http://semver.org | ||
|
||
------------------------------------------------------------------------------ | ||
xx/xx/18 VOD555, richardjgowers, iparask, orbeckst | ||
11/xx/18 VOD555, richardjgowers, iparask, orbeckst, kain88-de | ||
|
||
* 0.2.0 | ||
|
||
|
@@ -27,6 +27,15 @@ Fixes | |
* always distribute frames over blocks so that no empty blocks are | ||
created ("balanced blocks", Issue #71) | ||
|
||
Changes | ||
* requires dask >= 0.18.0 and respects/requires globally setting of the dask | ||
scheduler (Issue #48) | ||
* removed the 'scheduler' keyword from the run() method; use | ||
dask.config.set(scheduler=...) as recommended in the dask docs | ||
* uses single-threaaded scheduler if n_jobs=1 (Issue #17) | ||
* n_jobs=1 is now the default for run() (used to be n_jobs=-1) | ||
* dask.distributed is now a dependency | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. setup.py has it as full dep; it could be moved into test dependencies if you really want to keep it optional. If you make it fully optional, please remove this line from CHANGELOG |
||
|
||
|
||
06/07/18 orbeckst | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,7 +8,8 @@ | |
# | ||
# Released under the GNU Public Licence, v2 or any higher version | ||
|
||
from dask import distributed, multiprocessing | ||
from dask import distributed | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. tests require distributed |
||
import dask | ||
import pytest | ||
|
||
|
||
|
@@ -24,9 +25,13 @@ def client(tmpdir_factory, request): | |
lc.close() | ||
|
||
|
||
@pytest.fixture(scope='session', params=('distributed', 'multiprocessing')) | ||
@pytest.fixture(scope='session', params=('distributed', | ||
'multiprocessing', | ||
'single-threaded')) | ||
def scheduler(request, client): | ||
if request.param == 'distributed': | ||
return client | ||
arg = client | ||
else: | ||
return multiprocessing | ||
arg = request.param | ||
with dask.config.set(scheduler=arg): | ||
yield |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,7 +17,7 @@ are provided as keyword arguments: | |
|
||
set up the parallel analysis | ||
|
||
.. method:: run(n_jobs=-1, scheduler=None) | ||
.. method:: run(n_jobs=-1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the default is now |
||
|
||
perform parallel analysis; see :ref:`parallelization` | ||
for explanation of the arguments | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,8 +21,9 @@ | |
from six.moves import range | ||
|
||
import MDAnalysis as mda | ||
from dask import distributed, multiprocessing | ||
from dask.delayed import delayed | ||
import dask | ||
import dask.distributed | ||
from joblib import cpu_count | ||
import numpy as np | ||
|
||
|
@@ -267,7 +268,6 @@ def run(self, | |
start=None, | ||
stop=None, | ||
step=None, | ||
scheduler=None, | ||
n_jobs=1, | ||
n_blocks=None): | ||
"""Perform the calculation | ||
|
@@ -280,9 +280,6 @@ def run(self, | |
stop frame of analysis | ||
step : int, optional | ||
number of frames to skip between each analysed frame | ||
scheduler : dask scheduler, optional | ||
Use dask scheduler, defaults to multiprocessing. This can be used | ||
to spread work to a distributed scheduler | ||
n_jobs : int, optional | ||
number of jobs to start, if `-1` use number of logical cpu cores. | ||
This argument will be ignored when the distributed scheduler is | ||
|
@@ -292,24 +289,43 @@ def run(self, | |
to n_jobs or number of available workers in scheduler. | ||
|
||
""" | ||
# are we using a distributed scheduler or should we use | ||
# multiprocessing? | ||
scheduler = dask.config.get('scheduler', None) | ||
if scheduler is None: | ||
scheduler = multiprocessing | ||
# maybe we can grab a global worker | ||
try: | ||
scheduler = dask.distributed.worker.get_client() | ||
except ValueError: | ||
pass | ||
|
||
if n_jobs == -1: | ||
n_jobs = cpu_count() | ||
|
||
# we could not find a global scheduler to use and we ask for a single | ||
# job. Therefore we run this on the single threaded scheduler for | ||
# debugging. | ||
if scheduler is None and n_jobs == 1: | ||
scheduler = 'single-threaded' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixes #17 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice |
||
|
||
# fall back to multiprocessing, we tried everything | ||
if scheduler is None: | ||
scheduler = 'multiprocessing' | ||
|
||
if n_blocks is None: | ||
if scheduler == multiprocessing: | ||
if scheduler == 'multiprocessing': | ||
n_blocks = n_jobs | ||
elif isinstance(scheduler, distributed.Client): | ||
elif isinstance(scheduler, dask.distributed.Client): | ||
n_blocks = len(scheduler.ncores()) | ||
else: | ||
raise ValueError( | ||
"Couldn't guess ideal number of blocks from scheduler." | ||
n_blocks = 1 | ||
warnings.warn( | ||
"Couldn't guess ideal number of blocks from scheduler. " | ||
"Setting n_blocks=1. " | ||
"Please provide `n_blocks` in call to method.") | ||
|
||
scheduler_kwargs = {'get': scheduler.get} | ||
if scheduler == multiprocessing: | ||
scheduler_kwargs = {'scheduler': scheduler} | ||
if scheduler == 'multiprocessing': | ||
scheduler_kwargs['num_workers'] = n_jobs | ||
|
||
start, stop, step = self._trajectory.check_slice_indices( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo, needs fixing