Skip to content

Commit

Permalink
Merge pull request #138 from pnuu/feature-eager-saving
Browse files Browse the repository at this point in the history
Make it possible to do eager saving
  • Loading branch information
pnuu authored Mar 10, 2022
2 parents 97882c0 + dbd4e5d commit c9b3ad1
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 17 deletions.
7 changes: 7 additions & 0 deletions doc/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ are several options:
The ``fname_pattern`` can be a global setting at the top-level of the
configuration, and overridden on area and product levels.

It is possible to force the saving to be eager by defining
``eager_writing: True`` in the product list. Eager saving means that
each dataset are saved separately. The usage of this will most likely
increase the processing time by a significant amount, but it is
necessary until a `bug in XArray NetCDF4
<https://github.com/pydata/xarray/issues/6300>`_ handling is fixed.

Messaging for saved datasets
****************************

Expand Down
6 changes: 6 additions & 0 deletions examples/pl.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ product_list: &product_list
# min_coverage: 20 # at least 20% coverage
# use_tmp_file: False # create temporary filename first
# staging_zone: "/data/pytroll/tmp/staging_zone" # create files here first
# Force eager writing and computation of datasets.
# Required when saving several datasets to a single CF/NetCDF4 file until
# the bug in XArray NetCDF4 handling has been fixed. This option will most
# likely increase the required processing time.
# Issue on the XArray bug: https://github.com/pydata/xarray/issues/6300
# eager_writing: True

areas:
omerc_bb:
Expand Down
19 changes: 11 additions & 8 deletions trollflow2/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,11 @@ def prepared_filename(fmat, renames):
yield orig_filename


def save_dataset(scns, fmat, fmat_config, renames):
"""Save one dataset to file, not doing the actual computation."""
def save_dataset(scns, fmat, fmat_config, renames, compute=False):
"""Save one dataset to file.
If `compute=False` the saving is delayed and done lazily.
"""
obj = None
try:
with prepared_filename(fmat, renames) as filename:
Expand All @@ -222,12 +225,12 @@ def save_dataset(scns, fmat, fmat_config, renames):
dsids.append(_create_data_query(prod, res))
obj = scns[fmat['area']].save_datasets(datasets=dsids,
filename=filename,
compute=False, **kwargs)
compute=compute, **kwargs)
else:
dsid = _create_data_query(fmat['product'], res)
obj = scns[fmat['area']].save_dataset(dsid,
filename=filename,
compute=False, **kwargs)
compute=compute, **kwargs)
except KeyError as err:
LOG.warning('Skipping %s: %s', fmat['product'], str(err))
else:
Expand Down Expand Up @@ -274,15 +277,15 @@ def save_datasets(job):
objs = []
base_config = job['input_mda'].copy()
base_config.pop('dataset', None)

eager_writing = job['product_list']['product_list'].get("eager_writing", False)
with renamed_files() as renames:
for fmat, fmat_config in plist_iter(job['product_list']['product_list'], base_config):
obj = save_dataset(scns, fmat, fmat_config, renames)
obj = save_dataset(scns, fmat, fmat_config, renames, compute=eager_writing)
if obj is not None:
objs.append(obj)
job['produced_files'].put(fmat_config['filename'])

compute_writer_results(objs)
if not eager_writing:
compute_writer_results(objs)


def product_missing_from_scene(product, scene):
Expand Down
44 changes: 35 additions & 9 deletions trollflow2/tests/test_trollflow2.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,18 +367,11 @@ def close(self):
def test_save_datasets(self):
"""Test saving datasets."""
self.maxDiff = None
from yaml import UnsafeLoader
from trollflow2.plugins import save_datasets, DEFAULT
job = {}
job['input_mda'] = input_mda
job['product_list'] = {
'product_list': read_config(raw_string=yaml_test_save, Loader=UnsafeLoader)['product_list'],
}
job['resampled_scenes'] = {}

the_queue = mock.MagicMock()
job = _create_job_for_save_datasets()
job['produced_files'] = the_queue
for area in job['product_list']['product_list']['areas']:
job['resampled_scenes'][area] = mock.Mock()
with mock.patch('trollflow2.plugins.compute_writer_results'),\
mock.patch('trollflow2.plugins.DataQuery') as dsid,\
mock.patch('os.rename') as rename:
Expand Down Expand Up @@ -553,6 +546,39 @@ def test_save_datasets(self):
for fname, efname in zip(the_queue.put.mock_calls, filenames):
self.assertEqual(fname, mock.call(efname))

def test_save_datasets_eager(self):
"""Test saving datasets in eager manner."""
from trollflow2.plugins import save_datasets

job = _create_job_for_save_datasets()
job['product_list']['product_list']['eager_writing'] = True
with mock.patch('trollflow2.plugins.compute_writer_results') as compute_writer_results,\
mock.patch('trollflow2.plugins.DataQuery'),\
mock.patch('os.rename'):
save_datasets(job)
sd_calls = (job['resampled_scenes']['euron1'].save_dataset.mock_calls
+ job['resampled_scenes']['omerc_bb'].save_dataset.mock_calls)
for sd in sd_calls:
assert "compute=True" in str(sd)
sds_calls = job['resampled_scenes']['euron1'].save_datasets.mock_calls
for sds in sds_calls:
assert "compute=True" in str(sds)
compute_writer_results.assert_not_called()


def _create_job_for_save_datasets():
from yaml import UnsafeLoader
job = {}
job['input_mda'] = input_mda
job['product_list'] = {
'product_list': read_config(raw_string=yaml_test_save, Loader=UnsafeLoader)['product_list'],
}
job['resampled_scenes'] = {}
job['produced_files'] = mock.Mock()
for area in job['product_list']['product_list']['areas']:
job['resampled_scenes'][area] = mock.Mock()
return job


def test_use_staging_zone_no_tmpfile():
"""Test `prepared_filename` context with staging zone.
Expand Down

0 comments on commit c9b3ad1

Please sign in to comment.