Skip to content

Commit

Permalink
Refactoring executor string parsing to a separate function. RE:natcap…
Browse files Browse the repository at this point in the history
  • Loading branch information
phargogh committed Apr 4, 2024
1 parent cf4eba7 commit 398dd29
Showing 1 changed file with 26 additions and 15 deletions.
41 changes: 26 additions & 15 deletions src/pygeoprocessing/geoprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,23 @@ def __exit__(self, *args, **kwargs):
LOGGER.debug(str(args) + str(kwargs))


def _parse_executor(executor_obj):
if executor_obj:
if isinstance(executor_obj, concurrent.futures.Executor):
pass
elif executor_obj.startswith('multiprocessing'): # e.g. multiprocessing:4
executor_obj = concurrent.futures.ProcessPoolExecutor(
int(executor_obj.split(':')[-1]))
elif executor_obj.startswith('threading'): # e.g. threading:4
executor_obj = concurrent.futures.ThreadPoolExecutor(
int(executor_obj.split(':')[-1]))
else:
raise ValueError("bad executor string")
else: # no executor, so use _SynchronousExecutor
executor_obj = _SynchronousExecutor()
return executor_obj


LOGGER = logging.getLogger(__name__)

# Used in joining finished TaskGraph Tasks.
Expand Down Expand Up @@ -209,7 +226,8 @@ def raster_calculator(
datatype_target, nodata_target,
calc_raster_stats=True, use_shared_memory=False,
largest_block=_LARGEST_ITERBLOCK, max_timeout=_MAX_TIMEOUT,
raster_driver_creation_tuple=DEFAULT_GTIFF_CREATION_TUPLE_OPTIONS):
raster_driver_creation_tuple=DEFAULT_GTIFF_CREATION_TUPLE_OPTIONS,
executor=None):
"""Apply local a raster operation on a stack of rasters.
This function applies a user defined function across a stack of
Expand Down Expand Up @@ -527,6 +545,10 @@ def raster_calculator(
pixels_processed = 0
n_pixels = n_cols * n_rows

executor = _parse_executor(executor)

submitted_data = []

# iterate over each block and calculate local_op
for block_offset in block_offset_list:
# read input blocks
Expand Down Expand Up @@ -563,6 +585,8 @@ def raster_calculator(
# must be a raw tuple
data_blocks.append(value[0])

submitted_data = (
block_offset, executor.submit(local_op, *data_blocks))
target_block = local_op(*data_blocks)

if (not isinstance(target_block, numpy.ndarray) or
Expand Down Expand Up @@ -1215,20 +1239,7 @@ def align_and_resize_raster_stack(
if 'mask_vector_where_filter' in mask_options
else None))

if executor:
if isinstance(executor, concurrent.futures.Executor):
pass
elif executor.startswith('multiprocessing'): # e.g. multiprocessing:4
executor = concurrent.futures.ProcessPoolExecutor(
int(executor.split(':')[-1]))
elif executor.startswith('threading'): # e.g. threading:4
executor = concurrent.futures.ThreadPoolExecutor(
int(executor.split(':')[-1]))
else:
raise ValueError("bad executor string")
else: # no executor, so use _SynchronousExecutor
executor = _SynchronousExecutor()

executor = _parse_executor(executor)
LOGGER.debug(executor)

with executor as my_executor:
Expand Down

0 comments on commit 398dd29

Please sign in to comment.