Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve exception on convolve_2d worker timeout #406

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@ Release History
------------------
* Removing the ``numpy<2`` constraint for requirements.txt that should have
been included in the 2.4.5 release. https://github.com/natcap/pygeoprocessing/issues/396
* Handling GDAL-based ``RuntimeError``s raised during ``pygeoprocessing.reproject_vector``.
* Fixed an issue in ``convolve_2d`` where a long-running convolution would
raise a cryptic exception involving ``queue.Empty``. This will instead now
raise ``RuntimeError`` with a more helpful exception message. We also fixed
an issue where the ``max_timeout`` parameter of ``convolve_2d`` was unused,
so it is now used correctly. https://github.com/natcap/pygeoprocessing/issues/360
* Handling GDAL-based ``RuntimeError`` raised during ``pygeoprocessing.reproject_vector``.
https://github.com/natcap/pygeoprocessing/issues/409

2.4.5 (2024-10-08)
Expand Down
44 changes: 32 additions & 12 deletions src/pygeoprocessing/geoprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@
from .geoprocessing_core import DEFAULT_CREATION_OPTIONS
from .geoprocessing_core import DEFAULT_GTIFF_CREATION_TUPLE_OPTIONS
from .geoprocessing_core import DEFAULT_OSR_AXIS_MAPPING_STRATEGY
from .geoprocessing_core import gdal_use_exceptions
from .geoprocessing_core import GDALUseExceptions
from .geoprocessing_core import INT8_CREATION_OPTIONS
from .geoprocessing_core import gdal_use_exceptions, GDALUseExceptions

# This is used to efficiently pass data to the raster stats worker if available
if sys.version_info >= (3, 8):
Expand Down Expand Up @@ -3294,16 +3295,33 @@ def _fill_work_queue():
while True:
# the timeout guards against a worst case scenario where the
# ``_convolve_2d_worker`` has crashed.
write_payload = write_queue.get(timeout=_MAX_TIMEOUT)
if write_payload:
(index_dict, result, mask_result,
left_index_raster, right_index_raster,
top_index_raster, bottom_index_raster,
left_index_result, right_index_result,
top_index_result, bottom_index_result) = write_payload
else:
worker.join(max_timeout)
break
try:
write_payload = write_queue.get(timeout=max_timeout)
if write_payload:
(index_dict, result, mask_result,
left_index_raster, right_index_raster,
top_index_raster, bottom_index_raster,
left_index_result, right_index_result,
top_index_result, bottom_index_result) = write_payload
else:
worker.join(max_timeout)
break
except queue.Empty:
# Shut down the worker thread.
# The work queue only has 10 items in it at a time, so it's pretty
# likely that we can preemptively shut it down by adding a ``None``
# here and then have the queue not take too much longer to quit.
work_queue.put(None)

# Close thread-local raster objects
signal_raster = signal_band = None
target_raster = target_band = None
mask_raster = mask_band = None
LOGGER.exception("Worker timeout")
raise RuntimeError(
f"The convolution worker timed out after {max_timeout} "
"seconds. Either the timeout is too low for the "
"size of your data, or the worker has crashed")

output_array = numpy.empty(
(index_dict['win_ysize'], index_dict['win_xsize']),
Expand Down Expand Up @@ -3833,7 +3851,9 @@ def get_gis_type(path):
``pygeoprocessing.RASTER_TYPE``, or ``pygeoprocessing.VECTOR_TYPE``.

"""
from pygeoprocessing import UNKNOWN_TYPE, RASTER_TYPE, VECTOR_TYPE
from pygeoprocessing import RASTER_TYPE
from pygeoprocessing import UNKNOWN_TYPE
from pygeoprocessing import VECTOR_TYPE
gis_type = UNKNOWN_TYPE
try:
gis_raster = gdal.OpenEx(path, gdal.OF_RASTER)
Expand Down
27 changes: 26 additions & 1 deletion tests/test_geoprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@
from pygeoprocessing.geoprocessing_core import DEFAULT_CREATION_OPTIONS
from pygeoprocessing.geoprocessing_core import \
DEFAULT_GTIFF_CREATION_TUPLE_OPTIONS
from pygeoprocessing.geoprocessing_core import gdal_use_exceptions
from pygeoprocessing.geoprocessing_core import INT8_CREATION_OPTIONS
from pygeoprocessing.geoprocessing_core import \
INT8_GTIFF_CREATION_TUPLE_OPTIONS
from pygeoprocessing.geoprocessing_core import gdal_use_exceptions

_DEFAULT_ORIGIN = (444720, 3751320)
_DEFAULT_PIXEL_SIZE = (30, -30)
Expand Down Expand Up @@ -3458,6 +3458,31 @@ def test_convolve_2d_ignore_undefined_nodata(self):
numpy.isclose(signal_nodata_array, signal_nodata_none_array).all(),
'signal with nodata should be the same as signal with none')

def test_convolve_2d_error_on_worker_timeout(self):
"""PGP.geoprocessing: test convolve 2d error when worker times out."""
n_pixels = 10000
n_kernel_pixels = 17500
signal_array = numpy.ones((n_pixels, n_pixels), numpy.float32)
test_value = 0.5
signal_array[:] = test_value
target_nodata = -1
signal_path = os.path.join(self.workspace_dir, 'signal.tif')
_array_to_raster(signal_array, target_nodata, signal_path)
kernel_path = os.path.join(self.workspace_dir, 'kernel.tif')
kernel_array = numpy.zeros(
(n_kernel_pixels, n_kernel_pixels), numpy.float32)
kernel_array[int(n_kernel_pixels/2), int(n_kernel_pixels/2)] = 1
_array_to_raster(kernel_array, target_nodata, kernel_path)
target_path = os.path.join(self.workspace_dir, 'target.tif')
with self.assertRaises(RuntimeError):
pygeoprocessing.convolve_2d(
(signal_path, 1), (kernel_path, 1), target_path,
max_timeout=0.5)

# Wait for the worker thread to catch up
# Hacky, but should be enough to avoid test failures.
time.sleep(0.5)

def test_calculate_slope(self):
"""PGP.geoprocessing: test calculate slope."""
n_pixels = 9
Expand Down