-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
update to dask 0.18.0 #66
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assuming tests pass eventually, this looks awesome to me.
EDIT: Also, update docs please! (I only realized after reviewing...)
pmda/test/test_parallel.py
Outdated
@@ -95,7 +95,7 @@ def scheduler(request, client): | |||
if request.param == 'distributed': | |||
return client | |||
else: | |||
return multiprocessing | |||
return request.param |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is cool: so now we will be able to expand to all schedulers that dask supports by just adding strings to the fixture parametrization.
@kain88-de I might have approved a bit prematurely but in principle I think this is very good and I trust that you know best what else needs to be done. |
The docs do not mention |
Yeah we had a crystal ball last year when we started with the word
scheduler. I don’t remember ever using get.
…On Fri 21. Sep 2018 at 01:41, Oliver Beckstein ***@***.***> wrote:
The docs do not mention get anywhere because a while back we switched to
scheduler for the user API.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#66 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AEGnVgjwhbuL3I-QBChwrd6QSlg3tZLXks5udCejgaJpZM4WyG31>
.
|
Dask introduced some more changes that require larger changes within pmda. It should make the code clearer at the end. @VOD555 can you look into this? The relevant documentation is linked below. |
The main issue is that dask now uses a |
…ring - modified tests so that they use default scheduler - supplying n_jobs - NOTE: test_leaflets() failes for n_jobs=2; this NEEDS TO BE FIXED in a separate PR; right now this is marked as XFAIL
pmda/test/test_leaflet.py
Outdated
@@ -39,24 +38,29 @@ def correct_values(self): | |||
def correct_values_single_frame(self): | |||
return [np.arange(1, 2150, 12), np.arange(2521, 4670, 12)] | |||
|
|||
def test_leaflet(self, universe, correct_values): | |||
# XFAIL for 2 jobs needs to be fixed! | |||
@pytest.mark.parametrize('n_jobs', (1, pytest.mark.xfail(2))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@iparask the test_leaflets
test failed for me with n_jobs=2
E AssertionError:
E Arrays are not almost equal to 7 decimals
E error: leaflets should match test values
E (shapes (1,), (6,) mismatch)
E x: array([36634])
E y: array([36507, 36761, 37523, 37650, 38031, 38285])
My expectation was that this should give the same answer, just run faster... Can you please look into this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See also #76
With these changes, all tests pass locally (with dask 0.20)
|
@kain88-de in #66 (comment) you commented on a new global state in dask. How would that manifest itself as a problem for the tests? My understanding of @pytest.fixture(scope="session", params=(1, 2))
def client(tmpdir_factory, request):
with tmpdir_factory.mktemp("dask_cluster").as_cwd():
lc = distributed.LocalCluster(n_workers=request.param, processes=True)
client = distributed.Client(lc)
yield client
client.close()
lc.close() is that we set up a single distributed cluster for all tests (actually, two clusters, one with 1 and one with 2 workers) and tests that use it, get scheduled as workers become available. Can you explain how the session scope is a problem for new dask? |
- passes 'multiprocessing' as the scheduler instead of multiprocessing (which does not work with dask >= 0.20 anymore) - actually passes whatever we define as parameter; only distributed is currently an exception - removed superfluous import of distributed.multiprocessing
Codecov Report
@@ Coverage Diff @@
## master #66 +/- ##
==========================================
- Coverage 98.09% 94.41% -3.68%
==========================================
Files 8 8
Lines 419 448 +29
Branches 58 61 +3
==========================================
+ Hits 411 423 +12
- Misses 4 18 +14
- Partials 4 7 +3
Continue to review full report at Codecov.
|
@kain88-de @richardjgowers @VOD555 The tests for upgrade to dask 0.20 pass now; the coverage dropped for reasons that I do not understand. I had reviewed and approved this PR before it was passing Travis but now that I edited it, I'd appreciate some additional eyes, please. |
I get locally
all passing
but different coverage changes:
Using
shows that
Perhaps coverage has a hard time to see what's been covered when something is run under dask, under certain circumstances?? |
This test is needed to get coverage of leaflet back up but: TEST or CODE needs to be fixed.
I wanted to add tests for leafletfinder with distributed (by using the new parametrized |
In my local tests, the things under |
I haven't looked at the code changes yet! But I did stop working on this because dask 0.18 has changed the idiomatic style to change the scheduler docs. The new idom is to set the scheduler in a global variable dask.config.set(scheduler='threads') or with a context manager with dask.config.set(scheduler='threads'):
x.compute() The distributed scheduler overwrites these defaults now on creation from dask.distributed import Client
client = Client(...) # Connect to distributed cluster and override default
df.x.sum().compute() # This now runs on the distributed system The correct solution seems to rather be we remove the @pytest.fixture(params=['multiprocessing', ClientIP])
def scheduler(params):
with dask.config.set(params)
yield I assume I have the API wrong but the general idea is to start a context manager in the fixture and yield to release it at the end. How well this works I don't know. |
From my reading, setting the scheduler on x.compute(scheduler='threads') is still supported. I think as long as all our client = Client(...) # Connect to distributed cluster and override default will set the global defaults. Or do I misunderstand how this is working now?
I think you're right that this is the medium term correct solution so that using PMDA conforms to how people use Dask. In the short term (i.e., for this PR at least!) I'd like to move ahead with our current scheduler argument because it is still correct. Or do you see a problem? |
(Alternatively, if someone manages to get the new Dask paradigm working I am also happy... I just only have limited time for this right now.) |
The problem is that code suddenly behaves unexpected. Take the following example client = Client() # yeah lets use dask.distributed.
pdma.contacts.Contacts.run() # This uses multiprocessing! I would be surprised to see here that the distributed workers don't receive any jobs. Without knowledge of how dask used to work this is also hard to debug. |
# job. Therefore we run this on the single threaded scheduler for | ||
# debugging. | ||
if scheduler is None and n_jobs == 1: | ||
scheduler = 'single-threaded' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixes #17
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice
- someone should check with dask. It seems a bit brittle - fix tests maybe - update documentation fixes #17
... because PMDA defaults to 'multiprocessing'? Yes, I agree with you. Thanks for working on it! |
- fix #48 - updated boiler-plate code in ParallelAnalysisBase.run and copied and pasted into leaflet.LeafletFinder.run() (TODO: makes this more DRY) - dask.distributed added as dependency (it is recommended by dask for a single node anyway, and it avoids imports inside if statements... much cleaner code in PMDA) - removed scheduler kwarg: use dask.config.set(scheduler=...) - 'multiprocessing' and n_jobs=-1 are now only selected if nothing is set by dask; if one wants n_jobs=-1 to always grab all cores then you must set the multiprocessing scheduler - default for n_jobs=1 (instead of -1), i.e., the single threaded scheduler - updated tests - removed unnecessary broken(?) test for "no deprecations" in parallel.ParallelAnalysisBase - updated CHANGELOG
- install conda package of MDA on travis - require MDA and MDATests >= 0.19.0
@kain88-de are you sure you wanted to push 114a2b0? It looks as if it undoes some of the changes that I pushed and breaks the tests again. If it was intentional and you're working on it then just ignore me ;-). |
I don't want to depend on distributed for such simple checks. The code now does a trial import when necessary. We can still check the distributed scheduler in our tests it is not needed for the pmda though/ |
Fine with me, although I am pretty sure that anyone using dask will also install distributed or not mind having it installed, especially as http://docs.dask.org/en/latest/scheduling.html says
i.e., pretty much in all cases. |
@kain88-de please check – I'd like to get this merged so that we can move forward and I'd like to get 0.2.0 asap. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comments.
scheduler (Issue #48) | ||
* removed the 'scheduler' keyword from the run() method; use | ||
dask.config.set(scheduler=...) as recommended in the dask docs | ||
* uses single-threaaded scheduler if n_jobs=1 (Issue #17) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo, needs fixing
dask.config.set(scheduler=...) as recommended in the dask docs | ||
* uses single-threaaded scheduler if n_jobs=1 (Issue #17) | ||
* n_jobs=1 is now the default for run() (used to be n_jobs=-1) | ||
* dask.distributed is now a dependency |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
setup.py has it as full dep; it could be moved into test dependencies if you really want to keep it optional. If you make it fully optional, please remove this line from CHANGELOG
@@ -8,7 +8,8 @@ | |||
# | |||
# Released under the GNU Public Licence, v2 or any higher version | |||
|
|||
from dask import distributed, multiprocessing | |||
from dask import distributed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tests require distributed
@@ -17,7 +17,7 @@ are provided as keyword arguments: | |||
|
|||
set up the parallel analysis | |||
|
|||
.. method:: run(n_jobs=-1, scheduler=None) | |||
.. method:: run(n_jobs=-1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the default is now n_jobs=1
, isn't it?
@@ -91,6 +86,10 @@ def test_no_frames(analysis, n_jobs): | |||
assert analysis.timing.universe == 0 | |||
|
|||
|
|||
def test_scheduler(analysis, scheduler): | |||
analysis.run() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No assert
here – either ERROR or pass?
I removed my changes again. Lets go with the easier version |
I don't know where the reduced coverage comes from right now. |
I'll merge it regardless and then we need to dig a bit more into how coverage works with the different schedulers. Thanks for pushing forward here!!! |
1 similar comment
I'll merge it regardless and then we need to dig a bit more into how coverage works with the different schedulers. Thanks for pushing forward here!!! |
Stupid GitHub web interface does not work on my slightly outdated mobile. Can you please do a squash merge? Thanks! This will allow @VOD555 to continue. |
@orbeckst I've merged this PR. |
Thanks. The drop in coverage is due to |
Fix #48 and #17
Changes made in this Pull Request:
PR Checklist