-
Notifications
You must be signed in to change notification settings - Fork 299
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make NetCDF file cache handling compatible with dask distributed #2822
base: main
Are you sure you want to change the base?
Conversation
Start work on a utility function to get a dask array from a dataset variable in a way that is friendly to dask.distributed.
For the distributed-friendly dask array helper, parameterise the test to cover more cases. Simplify the implementation.
We need to force the shape and the dtype when getting the dask-distributed-friendly xarray-dataarray. Seems to have a first working prototype now.
Add group support for getting a dask distributed friendly dask array. Speed up the related tests by sharing the dask distributed client setup and breakdown.
Add partial backward compatibility for accessing the file handle attribute when using caching with a NetCDF4FileHandler base class. Backward incompatibility is not 100%. Deleting the FileHandler closes the manager and therefore the ``file_handle`` property, however, when accessing the ``file_handle`` property after deleting the ``FileHandler``, it is reopened. Therefore, calling `__del__()`` manually and then accessing ``fh.file_handle`` will now return an open file (was a closed file). This should not happen in any sane use scenario.
With the new dask-distributed-friendly caching, make sure we are respecting auto_maskandscale and are not applying scale factors twice.
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #2822 +/- ##
=======================================
Coverage 96.05% 96.06%
=======================================
Files 370 370
Lines 54320 54382 +62
=======================================
+ Hits 52177 52240 +63
+ Misses 2143 2142 -1
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
Remove a dead code except block that should never be reached.
Migrate TestNetCDF4FileHandler from unittest.TestCase to a regular class. Use a pytest fixture for the temporary NetCDF file.
Broaden the string that is matched against in TestNetCDF4FileHandler.test_filenotfound. On Linux and MacOS the expected failure gives "No such file or directory". On Windows it gives "Invalid file format".
Pull Request Test Coverage Report for Build 9597790771Warning: This coverage report may be inaccurate.This pull request's base commit is no longer the HEAD commit of its target branch. This means it includes changes from outside the original pull request, including, potentially, unrelated coverage changes.
Details
💛 - Coveralls |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome job deciphering how to use the CachingFileManager and wrapping things in a map_blocks task. I think this is really close to being done, but I had some concerns about the helper function.
satpy/readers/utils.py
Outdated
manager (xarray.backends.CachingFileManager): | ||
Instance of xarray.backends.CachingFileManager encapsulating the | ||
dataset to be read. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should check how the docs render this. If the argument type isn't "clickable" to go directly to the xarray docs for the CFM then we could wrap the mention of it in the description with:
:class:`xarray.backends.CachingFileManager`
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
satpy/readers/utils.py
Outdated
def get_distributed_friendly_dask_array(manager, varname, chunks, dtype, | ||
group="/", auto_maskandscale=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure how I feel about this function name. Obviously it makes sense in this PR because it solves this specific problem, but it feels like there is a (shorter) more generic name that gets the point across. Another thing is that distributed_friendly
is mentioned here, but that friendliness is a side effect of the "serializable" nature of the way you're accessing the data here, right? get_serializable_dask_array
?
I don't feel super strongly about this, but the name was distracting to me so I thought I'd say something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed get_serializable_dask_array
.
satpy/readers/utils.py
Outdated
method set_auto_maskandscale, such as is the case for | ||
NetCDF4.Dataset. | ||
""" | ||
def get_chunk(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The chunks
is never used here. The current calling from the file handler is accessing the full shape of the variable so this is fine, but only for now. I mean that map_blocks
will only ever call this function once. However, if you added a block_info
kwarg to the function signature or whatever the map_blocks
special keyword argument is, then you could change [:]
to access a specific sub-set of the NetCDF file variable and only do a partial load. This should improve performance a lot (I think 🤞) if it was actually used in the file handler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
chunks
is never used here.
Hm? I'm passing chunks=chunks
when I call da.map_blocks
. What do you mean, it is never used? Do you mean I could be using chunk-location
and num-chunks
from a block_info
dictionary passed to get_chunk
?
The current calling from the file handler is accessing the full shape of the variable so this is fine, but only for now. I mean that
map_blocks
will only ever call this function once. However, if you added ablock_info
kwarg to the function signature or whatever themap_blocks
special keyword argument is, then you could change[:]
to access a specific sub-set of the NetCDF file variable and only do a partial load. This should improve performance a lot (I think 🤞) if it was actually used in the file handler.
I will try to wrap may head around this ☺
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I think that's what I'm saying. I think the result of get_chunk()
right now is broken for any chunk size other than the full shape of the array because you never do any slicing of the NetCDF variable inside get_chunk()
. So, if you had a full array of 100x100 and a chunk size of 50x50, then map_blocks would call this function 4 times ((0-50, 0-50), (0-50, 50-100), (50-100, 0-50), (50-100, 50-100)). BUT each call would return the full variable 100x100. So I think this would be a case where the dask array would say "yeah, I have shape 100x100", but then once you computed it you'd get a 200x200 array back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed it now, I think.
Fix the spelling in the docstring example using netCDF4. Co-authored-by: David Hoese <[email protected]>
Add a workaround to prevent an unexpected type promotion in the unit test for dask distributed friendly dask arrays.
When getting a dask-distributed friendly dask array from a NetCDF file using the CachingFileManager, use the information provided in bloc_info on the array location in case we are reading not the entire variable.
Rename get_distributed_friendly_dask_array to get_serialisable_dask_array and remove the group argument, moving the responsibility for handlings groups to the caller.
Pytroll uses US spelling. Rename serializable to serialisable. Remove removed keyword argument from call.
Ensure that the meta we pass to map_blocks also has the right dtype. Not sure if this is necessary when map_blocks already has the right dtype, but it can't hurt.
Fixing three merge conflicts.
Pull Request Test Coverage Report for Build 10528447135Details
💛 - Coveralls |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just one comment inline. How is this affecting the performance of the reader? Do you consider this ready to be merged?
I had not tested that yet. I have now. Sadly, it gets worse :( A simple test script that reads FCI, loads some channels, resamples, and writes them again, not specifying any dask scheduler. With Satpy main: time 0:45.93, RAM 9.27 GB With this PR: time 0:55.0, RAM 10.0 GB Additionally, upon exiting, there is the repeated error message:
Sadly no, considering the problems described above. I will dig into this. |
With Satpy main, three runs, times in seconds: Scene creation: 13.9, 11.9, 11.5 Loading: 1.5, 1.3, 1.0 Computing: 6.5, 6.9, 6.5 With this PR: Scene creation: 13.2, 12.0, 12.0 Loading: 3.5, 3.5, 4.4 Computing: 5.5, 5.8, 5.6 So it's in particular the loading that gets slower. |
Profiling reveals that there are 160 calls to |
When caching, make sure we use the CachingFileManager already upon scene creation and not only by the time we are loading.
With c2b1533 loading is much faster, although with more variability than for satpy main. Scene creation is a little slower. Scene creation: 13.4, 14.5, 12.8, 12.6, 12.8, 13.0 Loading: 1.9, 1.0, 1.6, 1.4, 1.3, 1.0 |
I can't reliably reproduce the performance differences. Running it again with the main branch gives: Scene creation, main branch: 14.5, 14.3, 13.2 Scene creation, this PR: 13.6, 12.7, 13.8 And with cProfile, it's always faster with my PR. Considering those uncertainties, I will declare performance is the same within the measurement uncertainty. |
Don't subclass netCDF4.Dataset, rather just return an instance from a helper function. Seems good enough and gets rid of the weird error messages upon exit.
Fixed the problem with the strange exception/error messages upon exit in 9fce5a7. |
Some readers read entire groups; this needs xarray kwargs to be set even if caching is used.
I'm happy with this. @djhoese can you just confirm that you are good with this being merged? (and feel free to merge it if that's the case) |
if self.manager is None: | ||
return None | ||
return self.manager.acquire() | ||
|
||
@staticmethod | ||
def _set_file_handle_auto_maskandscale(file_handle, auto_maskandscale): | ||
if hasattr(file_handle, "set_auto_maskandscale"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not that this has to be handled in your PR, but if I remember correctly this Dataset
-level set_auto_maskandscale
was added to netcdf4-python quite a while ago. It seems error prone and confusing to silently call the method only if it exists and to not log/inform the user that it wasn't used when it was expected. Maybe we should remove this method on the file handler class and always call file_handle.set_auto_maskandscale
no matter what. Your wrapper does it already.
@pytest.fixture(scope="class") | ||
def dask_dist_client(self): | ||
"""Set up and close a dask distributed client.""" | ||
from dask.distributed import Client | ||
cl = Client() | ||
yield cl | ||
cl.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like dask developer's recommend using their test utilities for writing distributed-based tests:
https://distributed.dask.org/en/latest/develop.html#writing-tests
Would it be possible to use their tools instead of this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like you might be able to import this fixture and use it:
This PR makes file cache handling in the NetCDF4FileHandler compatible with dask distributed. It adds a utility function in
satpy.readers.utils
calledget_distributed_friendly_dask_array
, which can be used to produce a dask.array from a netCDF4 variable that can be used in an xarray, but dask graphs remain picklable and thus computable when including this one. This utility function is now used in NetCDF4FileHandler, which replaces homegrown file handle caching by caching using xarray.backends.CachingFileManager, which is needed to implement the aforementioned utility function.