From 475cab24e5df134b0198cca2eeef49bc42a62151 Mon Sep 17 00:00:00 2001 From: James Douglass Date: Wed, 9 Oct 2024 16:31:03 -0700 Subject: [PATCH 01/10] Adding a more human-readable RuntimeError. RE:#360 --- src/pygeoprocessing/geoprocessing.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/src/pygeoprocessing/geoprocessing.py b/src/pygeoprocessing/geoprocessing.py index 3935f4c3..43d47975 100644 --- a/src/pygeoprocessing/geoprocessing.py +++ b/src/pygeoprocessing/geoprocessing.py @@ -36,8 +36,9 @@ from .geoprocessing_core import DEFAULT_CREATION_OPTIONS from .geoprocessing_core import DEFAULT_GTIFF_CREATION_TUPLE_OPTIONS from .geoprocessing_core import DEFAULT_OSR_AXIS_MAPPING_STRATEGY +from .geoprocessing_core import gdal_use_exceptions +from .geoprocessing_core import GDALUseExceptions from .geoprocessing_core import INT8_CREATION_OPTIONS -from .geoprocessing_core import gdal_use_exceptions, GDALUseExceptions # This is used to efficiently pass data to the raster stats worker if available if sys.version_info >= (3, 8): @@ -3286,7 +3287,13 @@ def _fill_work_queue(): while True: # the timeout guards against a worst case scenario where the # ``_convolve_2d_worker`` has crashed. - write_payload = write_queue.get(timeout=_MAX_TIMEOUT) + try: + write_payload = write_queue.get(timeout=max_timeout) + except queue.Empty: + raise RuntimeError( + f"The convolution worker timed out after {max_timeout} " + "seconds. Either the timeout is too low for the " + "size of your data, or the worker has crashed.") if write_payload: (index_dict, result, mask_result, left_index_raster, right_index_raster, @@ -3825,7 +3832,9 @@ def get_gis_type(path): ``pygeoprocessing.RASTER_TYPE``, or ``pygeoprocessing.VECTOR_TYPE``. """ - from pygeoprocessing import UNKNOWN_TYPE, RASTER_TYPE, VECTOR_TYPE + from pygeoprocessing import RASTER_TYPE + from pygeoprocessing import UNKNOWN_TYPE + from pygeoprocessing import VECTOR_TYPE gis_type = UNKNOWN_TYPE try: gis_raster = gdal.OpenEx(path, gdal.OF_RASTER) From 3100b17e4d596699092b1b6e43bd2678f0f187c6 Mon Sep 17 00:00:00 2001 From: James Douglass Date: Wed, 9 Oct 2024 16:31:42 -0700 Subject: [PATCH 02/10] Adding a test for the new exception. RE:#360 --- tests/test_geoprocessing.py | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/tests/test_geoprocessing.py b/tests/test_geoprocessing.py index 268f12b9..15671c6e 100644 --- a/tests/test_geoprocessing.py +++ b/tests/test_geoprocessing.py @@ -32,10 +32,10 @@ from pygeoprocessing.geoprocessing_core import DEFAULT_CREATION_OPTIONS from pygeoprocessing.geoprocessing_core import \ DEFAULT_GTIFF_CREATION_TUPLE_OPTIONS +from pygeoprocessing.geoprocessing_core import gdal_use_exceptions from pygeoprocessing.geoprocessing_core import INT8_CREATION_OPTIONS from pygeoprocessing.geoprocessing_core import \ INT8_GTIFF_CREATION_TUPLE_OPTIONS -from pygeoprocessing.geoprocessing_core import gdal_use_exceptions _DEFAULT_ORIGIN = (444720, 3751320) _DEFAULT_PIXEL_SIZE = (30, -30) @@ -3416,6 +3416,27 @@ def test_convolve_2d_ignore_undefined_nodata(self): numpy.isclose(signal_nodata_array, signal_nodata_none_array).all(), 'signal with nodata should be the same as signal with none') + def test_convolve_2d_error_on_worker_timeout(self): + """PGP.geoprocessing: test convolve 2d error when worker times out.""" + n_pixels = 10000 + n_kernel_pixels = 17500 + signal_array = numpy.ones((n_pixels, n_pixels), numpy.float32) + test_value = 0.5 + signal_array[:] = test_value + target_nodata = -1 + signal_path = os.path.join(self.workspace_dir, 'signal.tif') + _array_to_raster(signal_array, target_nodata, signal_path) + kernel_path = os.path.join(self.workspace_dir, 'kernel.tif') + kernel_array = numpy.zeros( + (n_kernel_pixels, n_kernel_pixels), numpy.float32) + kernel_array[int(n_kernel_pixels/2), int(n_kernel_pixels/2)] = 1 + _array_to_raster(kernel_array, target_nodata, kernel_path) + target_path = os.path.join(self.workspace_dir, 'target.tif') + with self.assertRaises(RuntimeError): + pygeoprocessing.convolve_2d( + (signal_path, 1), (kernel_path, 1), target_path, + max_timeout=0.5) + def test_calculate_slope(self): """PGP.geoprocessing: test calculate slope.""" n_pixels = 9 From fbc5661702371a6ae1b157b749be6532155996da Mon Sep 17 00:00:00 2001 From: James Douglass Date: Wed, 9 Oct 2024 16:32:01 -0700 Subject: [PATCH 03/10] Fixing error messages in tests to match later GDAL. RE:#360 --- tests/test_geoprocessing.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_geoprocessing.py b/tests/test_geoprocessing.py index 15671c6e..50f258d1 100644 --- a/tests/test_geoprocessing.py +++ b/tests/test_geoprocessing.py @@ -4048,7 +4048,7 @@ def test_get_raster_info_error_handling(self): not_a_raster_file.write("this is not a raster.\n") with self.assertRaises(RuntimeError) as cm: pygeoprocessing.get_raster_info(not_a_raster_path) - self.assertIn('not recognized as a supported file format', str(cm.exception)) + self.assertIn('supported file format', str(cm.exception)) def test_get_vector_info_error_handling(self): """PGP: test that bad data raise good errors in get_vector_info.""" @@ -4065,7 +4065,7 @@ def test_get_vector_info_error_handling(self): not_a_vector_file.write("this is not a vector.\n") with self.assertRaises(RuntimeError) as cm: pygeoprocessing.get_vector_info(not_a_vector_path) - self.assertIn('not recognized as a supported file format', str(cm.exception)) + self.assertIn('supported file format', str(cm.exception)) def test_merge_bounding_box_list(self): """PGP: test merge_bounding_box_list.""" From e9d99e05f0bc2778987afcb2f0bf2e9de33be26c Mon Sep 17 00:00:00 2001 From: James Douglass Date: Wed, 9 Oct 2024 16:34:33 -0700 Subject: [PATCH 04/10] Noting changes in HISTORY. RE:#360 --- HISTORY.rst | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/HISTORY.rst b/HISTORY.rst index 6662f102..e9d03111 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -7,6 +7,11 @@ Release History ------------------ * Updating for numpy 2.0 API changes. Pygeoprocessing is now compatible with numpy 2.0 and later. https://github.com/natcap/pygeoprocessing/issues/396 +* Fixed an issue in ``convolve_2d`` where a long-running convolution would + raise a cryptic exception involving ``queue.Empty``. This will instead now + raise ``RuntimeError`` with a more helpful exception message. We also fixed + an issue where the ``max_timeout`` parameter of ``convolve_2d`` was unused, + so it is now used correctly. https://github.com/natcap/pygeoprocessing/issues/360 2.4.4 (2024-05-21) ------------------ From 74ff09828f74ebe98aaa0a0d09ad5dd4860febc9 Mon Sep 17 00:00:00 2001 From: James Douglass Date: Wed, 9 Oct 2024 16:53:43 -0700 Subject: [PATCH 05/10] Closing GDAL objects before raising an exception. RE:#360 --- src/pygeoprocessing/geoprocessing.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/pygeoprocessing/geoprocessing.py b/src/pygeoprocessing/geoprocessing.py index 43d47975..f1e769dc 100644 --- a/src/pygeoprocessing/geoprocessing.py +++ b/src/pygeoprocessing/geoprocessing.py @@ -3290,6 +3290,9 @@ def _fill_work_queue(): try: write_payload = write_queue.get(timeout=max_timeout) except queue.Empty: + signal_raster = signal_band = None + target_raster = target_band = None + mask_raster = mask_band = None raise RuntimeError( f"The convolution worker timed out after {max_timeout} " "seconds. Either the timeout is too low for the " From 2fd1df717151f1ab3fedb251b87295783cdb7ac6 Mon Sep 17 00:00:00 2001 From: James Douglass Date: Thu, 10 Oct 2024 12:05:57 -0700 Subject: [PATCH 06/10] Wrapping both queue operations in exception handling. RE:#360 --- src/pygeoprocessing/geoprocessing.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/src/pygeoprocessing/geoprocessing.py b/src/pygeoprocessing/geoprocessing.py index f1e769dc..c539f260 100644 --- a/src/pygeoprocessing/geoprocessing.py +++ b/src/pygeoprocessing/geoprocessing.py @@ -3289,23 +3289,24 @@ def _fill_work_queue(): # ``_convolve_2d_worker`` has crashed. try: write_payload = write_queue.get(timeout=max_timeout) + if write_payload: + (index_dict, result, mask_result, + left_index_raster, right_index_raster, + top_index_raster, bottom_index_raster, + left_index_result, right_index_result, + top_index_result, bottom_index_result) = write_payload + else: + worker.join(max_timeout) + break except queue.Empty: signal_raster = signal_band = None target_raster = target_band = None mask_raster = mask_band = None + LOGGER.exception("Worker timeout") raise RuntimeError( f"The convolution worker timed out after {max_timeout} " "seconds. Either the timeout is too low for the " - "size of your data, or the worker has crashed.") - if write_payload: - (index_dict, result, mask_result, - left_index_raster, right_index_raster, - top_index_raster, bottom_index_raster, - left_index_result, right_index_result, - top_index_result, bottom_index_result) = write_payload - else: - worker.join(max_timeout) - break + "size of your data, or the worker has crashed") output_array = numpy.empty( (index_dict['win_ysize'], index_dict['win_xsize']), From 945ad09feddfa2d99fefa39cb830de0782be8b24 Mon Sep 17 00:00:00 2001 From: James Douglass Date: Thu, 10 Oct 2024 13:03:11 -0700 Subject: [PATCH 07/10] Waiting 2s for the worker thread to finish. RE:#360 --- tests/test_geoprocessing.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/test_geoprocessing.py b/tests/test_geoprocessing.py index 50f258d1..3a3bc5ed 100644 --- a/tests/test_geoprocessing.py +++ b/tests/test_geoprocessing.py @@ -3437,6 +3437,10 @@ def test_convolve_2d_error_on_worker_timeout(self): (signal_path, 1), (kernel_path, 1), target_path, max_timeout=0.5) + # Wait for the worker thread to catch up + # Hacky, but should be enough to avoid test failures. + time.sleep(2) + def test_calculate_slope(self): """PGP.geoprocessing: test calculate slope.""" n_pixels = 9 From 430956b6b598350df9763e3d1a82db910cfd1a2e Mon Sep 17 00:00:00 2001 From: James Douglass Date: Thu, 10 Oct 2024 13:04:36 -0700 Subject: [PATCH 08/10] Attempting to shut down the work queue early. RE:#360 --- src/pygeoprocessing/geoprocessing.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/pygeoprocessing/geoprocessing.py b/src/pygeoprocessing/geoprocessing.py index c539f260..3bed89e0 100644 --- a/src/pygeoprocessing/geoprocessing.py +++ b/src/pygeoprocessing/geoprocessing.py @@ -3299,6 +3299,13 @@ def _fill_work_queue(): worker.join(max_timeout) break except queue.Empty: + # Shut down the worker thread. + # The work queue only has 10 items in it at a time, so it's pretty + # likely that we can preemptively shut it down by adding a ``None`` + # here and then have the queue not take too much longer to quit. + work_queue.put(None) + + # Close thread-local raster objects signal_raster = signal_band = None target_raster = target_band = None mask_raster = mask_band = None From d75ad4bfff7d4f3103bc0aa9e5c1f04d31ada82e Mon Sep 17 00:00:00 2001 From: James Douglass Date: Thu, 10 Oct 2024 13:09:27 -0700 Subject: [PATCH 09/10] Tuning down the sleep time in the new test. RE:#360 --- tests/test_geoprocessing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_geoprocessing.py b/tests/test_geoprocessing.py index 3a3bc5ed..a8d3c973 100644 --- a/tests/test_geoprocessing.py +++ b/tests/test_geoprocessing.py @@ -3439,7 +3439,7 @@ def test_convolve_2d_error_on_worker_timeout(self): # Wait for the worker thread to catch up # Hacky, but should be enough to avoid test failures. - time.sleep(2) + time.sleep(0.5) def test_calculate_slope(self): """PGP.geoprocessing: test calculate slope.""" From 8f6c0f144746d4a88e6a44d889210ff3303483be Mon Sep 17 00:00:00 2001 From: James Douglass Date: Thu, 10 Oct 2024 13:16:19 -0700 Subject: [PATCH 10/10] Correcting HISTORY log after a merge. RE:#360 --- HISTORY.rst | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/HISTORY.rst b/HISTORY.rst index 7334f5dd..dd0f205d 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -5,17 +5,17 @@ Unreleased Changes ------------------ * Removing the ``numpy<2`` constraint for requirements.txt that should have been included in the 2.4.5 release. https://github.com/natcap/pygeoprocessing/issues/396 - -2.4.5 (2024-10-08) ------------------- -* Updating for numpy 2.0 API changes. Pygeoprocessing is now compatible with - numpy 2.0 and later. https://github.com/natcap/pygeoprocessing/issues/396 * Fixed an issue in ``convolve_2d`` where a long-running convolution would raise a cryptic exception involving ``queue.Empty``. This will instead now raise ``RuntimeError`` with a more helpful exception message. We also fixed an issue where the ``max_timeout`` parameter of ``convolve_2d`` was unused, so it is now used correctly. https://github.com/natcap/pygeoprocessing/issues/360 +2.4.5 (2024-10-08) +------------------ +* Updating for numpy 2.0 API changes. Pygeoprocessing is now compatible with + numpy 2.0 and later. https://github.com/natcap/pygeoprocessing/issues/396 + 2.4.4 (2024-05-21) ------------------ * Our github actions for building python distributions now use