Skip to content

Commit

Permalink
Merge pull request #406 from phargogh/bugfix/360-convolve2d-unhelpful…
Browse files Browse the repository at this point in the history
…-error-on-timeout

Improve exception on `convolve_2d` worker timeout
  • Loading branch information
emlys authored Nov 23, 2024
2 parents adbc80c + edf9416 commit 8982604
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 14 deletions.
7 changes: 6 additions & 1 deletion HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@ Release History
------------------
* Removing the ``numpy<2`` constraint for requirements.txt that should have
been included in the 2.4.5 release. https://github.com/natcap/pygeoprocessing/issues/396
* Handling GDAL-based ``RuntimeError``s raised during ``pygeoprocessing.reproject_vector``.
* Fixed an issue in ``convolve_2d`` where a long-running convolution would
raise a cryptic exception involving ``queue.Empty``. This will instead now
raise ``RuntimeError`` with a more helpful exception message. We also fixed
an issue where the ``max_timeout`` parameter of ``convolve_2d`` was unused,
so it is now used correctly. https://github.com/natcap/pygeoprocessing/issues/360
* Handling GDAL-based ``RuntimeError`` raised during ``pygeoprocessing.reproject_vector``.
https://github.com/natcap/pygeoprocessing/issues/409

2.4.5 (2024-10-08)
Expand Down
44 changes: 32 additions & 12 deletions src/pygeoprocessing/geoprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@
from .geoprocessing_core import DEFAULT_CREATION_OPTIONS
from .geoprocessing_core import DEFAULT_GTIFF_CREATION_TUPLE_OPTIONS
from .geoprocessing_core import DEFAULT_OSR_AXIS_MAPPING_STRATEGY
from .geoprocessing_core import gdal_use_exceptions
from .geoprocessing_core import GDALUseExceptions
from .geoprocessing_core import INT8_CREATION_OPTIONS
from .geoprocessing_core import gdal_use_exceptions, GDALUseExceptions

# This is used to efficiently pass data to the raster stats worker if available
if sys.version_info >= (3, 8):
Expand Down Expand Up @@ -3294,16 +3295,33 @@ def _fill_work_queue():
while True:
# the timeout guards against a worst case scenario where the
# ``_convolve_2d_worker`` has crashed.
write_payload = write_queue.get(timeout=_MAX_TIMEOUT)
if write_payload:
(index_dict, result, mask_result,
left_index_raster, right_index_raster,
top_index_raster, bottom_index_raster,
left_index_result, right_index_result,
top_index_result, bottom_index_result) = write_payload
else:
worker.join(max_timeout)
break
try:
write_payload = write_queue.get(timeout=max_timeout)
if write_payload:
(index_dict, result, mask_result,
left_index_raster, right_index_raster,
top_index_raster, bottom_index_raster,
left_index_result, right_index_result,
top_index_result, bottom_index_result) = write_payload
else:
worker.join(max_timeout)
break
except queue.Empty:
# Shut down the worker thread.
# The work queue only has 10 items in it at a time, so it's pretty
# likely that we can preemptively shut it down by adding a ``None``
# here and then have the queue not take too much longer to quit.
work_queue.put(None)

# Close thread-local raster objects
signal_raster = signal_band = None
target_raster = target_band = None
mask_raster = mask_band = None
LOGGER.exception("Worker timeout")
raise RuntimeError(
f"The convolution worker timed out after {max_timeout} "
"seconds. Either the timeout is too low for the "
"size of your data, or the worker has crashed")

output_array = numpy.empty(
(index_dict['win_ysize'], index_dict['win_xsize']),
Expand Down Expand Up @@ -3833,7 +3851,9 @@ def get_gis_type(path):
``pygeoprocessing.RASTER_TYPE``, or ``pygeoprocessing.VECTOR_TYPE``.
"""
from pygeoprocessing import UNKNOWN_TYPE, RASTER_TYPE, VECTOR_TYPE
from pygeoprocessing import RASTER_TYPE
from pygeoprocessing import UNKNOWN_TYPE
from pygeoprocessing import VECTOR_TYPE
gis_type = UNKNOWN_TYPE
try:
gis_raster = gdal.OpenEx(path, gdal.OF_RASTER)
Expand Down
27 changes: 26 additions & 1 deletion tests/test_geoprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@
from pygeoprocessing.geoprocessing_core import DEFAULT_CREATION_OPTIONS
from pygeoprocessing.geoprocessing_core import \
DEFAULT_GTIFF_CREATION_TUPLE_OPTIONS
from pygeoprocessing.geoprocessing_core import gdal_use_exceptions
from pygeoprocessing.geoprocessing_core import INT8_CREATION_OPTIONS
from pygeoprocessing.geoprocessing_core import \
INT8_GTIFF_CREATION_TUPLE_OPTIONS
from pygeoprocessing.geoprocessing_core import gdal_use_exceptions

_DEFAULT_ORIGIN = (444720, 3751320)
_DEFAULT_PIXEL_SIZE = (30, -30)
Expand Down Expand Up @@ -3458,6 +3458,31 @@ def test_convolve_2d_ignore_undefined_nodata(self):
numpy.isclose(signal_nodata_array, signal_nodata_none_array).all(),
'signal with nodata should be the same as signal with none')

def test_convolve_2d_error_on_worker_timeout(self):
"""PGP.geoprocessing: test convolve 2d error when worker times out."""
n_pixels = 10000
n_kernel_pixels = 17500
signal_array = numpy.ones((n_pixels, n_pixels), numpy.float32)
test_value = 0.5
signal_array[:] = test_value
target_nodata = -1
signal_path = os.path.join(self.workspace_dir, 'signal.tif')
_array_to_raster(signal_array, target_nodata, signal_path)
kernel_path = os.path.join(self.workspace_dir, 'kernel.tif')
kernel_array = numpy.zeros(
(n_kernel_pixels, n_kernel_pixels), numpy.float32)
kernel_array[int(n_kernel_pixels/2), int(n_kernel_pixels/2)] = 1
_array_to_raster(kernel_array, target_nodata, kernel_path)
target_path = os.path.join(self.workspace_dir, 'target.tif')
with self.assertRaises(RuntimeError):
pygeoprocessing.convolve_2d(
(signal_path, 1), (kernel_path, 1), target_path,
max_timeout=0.5)

# Wait for the worker thread to catch up
# Hacky, but should be enough to avoid test failures.
time.sleep(0.5)

def test_calculate_slope(self):
"""PGP.geoprocessing: test calculate slope."""
n_pixels = 9
Expand Down

0 comments on commit 8982604

Please sign in to comment.