diff --git a/doc/source/index.rst b/doc/source/index.rst index 521318f8..6e53ef93 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -151,6 +151,13 @@ are several options: The ``fname_pattern`` can be a global setting at the top-level of the configuration, and overridden on area and product levels. +It is possible to force the saving to be eager by defining +``eager_writing: True`` in the product list. Eager saving means that +each dataset are saved separately. The usage of this will most likely +increase the processing time by a significant amount, but it is +necessary until a `bug in XArray NetCDF4 +`_ handling is fixed. + Messaging for saved datasets **************************** diff --git a/examples/pl.yaml b/examples/pl.yaml index df516f94..1a596785 100644 --- a/examples/pl.yaml +++ b/examples/pl.yaml @@ -57,6 +57,12 @@ product_list: &product_list # min_coverage: 20 # at least 20% coverage # use_tmp_file: False # create temporary filename first # staging_zone: "/data/pytroll/tmp/staging_zone" # create files here first + # Force eager writing and computation of datasets. + # Required when saving several datasets to a single CF/NetCDF4 file until + # the bug in XArray NetCDF4 handling has been fixed. This option will most + # likely increase the required processing time. + # Issue on the XArray bug: https://github.com/pydata/xarray/issues/6300 + # eager_writing: True areas: omerc_bb: diff --git a/trollflow2/plugins/__init__.py b/trollflow2/plugins/__init__.py index c9f6a4bb..2c9eef2b 100644 --- a/trollflow2/plugins/__init__.py +++ b/trollflow2/plugins/__init__.py @@ -206,8 +206,11 @@ def prepared_filename(fmat, renames): yield orig_filename -def save_dataset(scns, fmat, fmat_config, renames): - """Save one dataset to file, not doing the actual computation.""" +def save_dataset(scns, fmat, fmat_config, renames, compute=False): + """Save one dataset to file. + + If `compute=False` the saving is delayed and done lazily. + """ obj = None try: with prepared_filename(fmat, renames) as filename: @@ -222,12 +225,12 @@ def save_dataset(scns, fmat, fmat_config, renames): dsids.append(_create_data_query(prod, res)) obj = scns[fmat['area']].save_datasets(datasets=dsids, filename=filename, - compute=False, **kwargs) + compute=compute, **kwargs) else: dsid = _create_data_query(fmat['product'], res) obj = scns[fmat['area']].save_dataset(dsid, filename=filename, - compute=False, **kwargs) + compute=compute, **kwargs) except KeyError as err: LOG.warning('Skipping %s: %s', fmat['product'], str(err)) else: @@ -274,15 +277,15 @@ def save_datasets(job): objs = [] base_config = job['input_mda'].copy() base_config.pop('dataset', None) - + eager_writing = job['product_list']['product_list'].get("eager_writing", False) with renamed_files() as renames: for fmat, fmat_config in plist_iter(job['product_list']['product_list'], base_config): - obj = save_dataset(scns, fmat, fmat_config, renames) + obj = save_dataset(scns, fmat, fmat_config, renames, compute=eager_writing) if obj is not None: objs.append(obj) job['produced_files'].put(fmat_config['filename']) - - compute_writer_results(objs) + if not eager_writing: + compute_writer_results(objs) def product_missing_from_scene(product, scene): diff --git a/trollflow2/tests/test_trollflow2.py b/trollflow2/tests/test_trollflow2.py index 428c2a42..5458b3eb 100644 --- a/trollflow2/tests/test_trollflow2.py +++ b/trollflow2/tests/test_trollflow2.py @@ -367,18 +367,11 @@ def close(self): def test_save_datasets(self): """Test saving datasets.""" self.maxDiff = None - from yaml import UnsafeLoader from trollflow2.plugins import save_datasets, DEFAULT - job = {} - job['input_mda'] = input_mda - job['product_list'] = { - 'product_list': read_config(raw_string=yaml_test_save, Loader=UnsafeLoader)['product_list'], - } - job['resampled_scenes'] = {} + the_queue = mock.MagicMock() + job = _create_job_for_save_datasets() job['produced_files'] = the_queue - for area in job['product_list']['product_list']['areas']: - job['resampled_scenes'][area] = mock.Mock() with mock.patch('trollflow2.plugins.compute_writer_results'),\ mock.patch('trollflow2.plugins.DataQuery') as dsid,\ mock.patch('os.rename') as rename: @@ -553,6 +546,39 @@ def test_save_datasets(self): for fname, efname in zip(the_queue.put.mock_calls, filenames): self.assertEqual(fname, mock.call(efname)) + def test_save_datasets_eager(self): + """Test saving datasets in eager manner.""" + from trollflow2.plugins import save_datasets + + job = _create_job_for_save_datasets() + job['product_list']['product_list']['eager_writing'] = True + with mock.patch('trollflow2.plugins.compute_writer_results') as compute_writer_results,\ + mock.patch('trollflow2.plugins.DataQuery'),\ + mock.patch('os.rename'): + save_datasets(job) + sd_calls = (job['resampled_scenes']['euron1'].save_dataset.mock_calls + + job['resampled_scenes']['omerc_bb'].save_dataset.mock_calls) + for sd in sd_calls: + assert "compute=True" in str(sd) + sds_calls = job['resampled_scenes']['euron1'].save_datasets.mock_calls + for sds in sds_calls: + assert "compute=True" in str(sds) + compute_writer_results.assert_not_called() + + +def _create_job_for_save_datasets(): + from yaml import UnsafeLoader + job = {} + job['input_mda'] = input_mda + job['product_list'] = { + 'product_list': read_config(raw_string=yaml_test_save, Loader=UnsafeLoader)['product_list'], + } + job['resampled_scenes'] = {} + job['produced_files'] = mock.Mock() + for area in job['product_list']['product_list']['areas']: + job['resampled_scenes'][area] = mock.Mock() + return job + def test_use_staging_zone_no_tmpfile(): """Test `prepared_filename` context with staging zone.