Skip to content

Commit

Permalink
Merge pull request natcap#389 from phargogh/feature/386-add-decaying-…
Browse files Browse the repository at this point in the history
…flow-accumulation

Implement decaying D8 flow accumulation
  • Loading branch information
dcdenu4 authored Jun 24, 2024
2 parents 0622c75 + eaedc96 commit 5e9fc84
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 18 deletions.
2 changes: 2 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ Release History
called with an invalid resampling algorithm. We now fall back to the
underlying GDAL functions' error messages.
https://github.com/natcap/pygeoprocessing/issues/387
* Implementing decaying flow accumulation for D8 routing.
https://github.com/natcap/pygeoprocessing/issues/386
* Updated to Cython 3.
* Dropped support for Python 3.7.

Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# This file records the packages and requirements needed in order for
# pygeoprocessing to work as expected.
GDAL>=3.0.4
numpy>=1.10.1
numpy>=1.10.1,<2.0
Rtree>=0.8.3
scipy>=0.14.1,!=0.19.1
Shapely>=1.6.4
Expand Down
139 changes: 122 additions & 17 deletions src/pygeoprocessing/routing/routing.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,24 @@ cdef struct FlowPixelType:
int last_flow_dir
double value

# This struct is used to track a value as it decays. The intent is to have the
# ``decayed_value`` updated in-place as we walk the flow graph until the value
# is less than the ``min_value``.
cdef struct DecayingValue:
double decayed_value # The value, which will be progressively updated as it decays
double min_value # The minimum value before the Decaying Value should be ignored.
# This struct is used to track an intermediate flow pixel's last calculated
# direction and flow accumulation value so far (just like with FlowPixelType).
# Additionally, we track all of the decaying values from upstream that
# influence the load on this pixel. Used during decaying flow accumulation.
cdef struct WeightedFlowPixelType:
unsigned int xi # The pixel's x coordinate in pixel space
unsigned int yi # The pixel's y coordinate in pixel space
int last_flow_dir # The last flow direction processed on this pixel
double value # The flow accumulation value at this pixel
queue[DecayingValue] decaying_values # A queue of upstream decaying values that affect the accumulation on this pixel.

# this struct is used in distance_to_channel_mfd to add up each pixel's
# weighted distances and flow weights
cdef struct MFDFlowPixelType:
Expand Down Expand Up @@ -1442,7 +1460,8 @@ def flow_dir_d8(

def flow_accumulation_d8(
flow_dir_raster_path_band, target_flow_accum_raster_path,
weight_raster_path_band=None,
weight_raster_path_band=None, custom_decay_factor=None,
double min_decay_proportion=0.001,
raster_driver_creation_tuple=DEFAULT_GTIFF_CREATION_TUPLE_OPTIONS):
"""D8 flow accumulation.
Expand All @@ -1468,6 +1487,18 @@ def flow_accumulation_d8(
weight. If ``None``, 1 is the default flow accumulation weight.
This raster must be the same dimensions as
``flow_dir_mfd_raster_path_band``.
custom_decay_factor=None (float or tuple): a custom decay factor, either
represented as a float (where the same decay factor is applied to
all valid pixels) or a path/band tuple for a raster where pixel
values represent spatially-explicit decay values. As the on-pixel
load passes through a pixel, the decay factor is applied, so you
could think of this layer as representing the proportion of a
pollutant that is absorbed by the landscape as it passes through
along the flowpath.
min_decay_proportion=0.001 (float): A value representing the minimum
decayed value that should continue to be tracked along the flow
path when using a custom decay factor. If the upstream decayed
contribution falls below this value, it is not included.
raster_driver_creation_tuple (tuple): a tuple containing a GDAL driver
name string as the first element and a GDAL creation options
tuple/list as the second. Defaults to a GTiff driver tuple
Expand Down Expand Up @@ -1505,8 +1536,8 @@ def flow_accumulation_d8(

# `search_stack` is used to walk upstream to calculate flow accumulation
# values
cdef stack[FlowPixelType] search_stack
cdef FlowPixelType flow_pixel
cdef stack[WeightedFlowPixelType] search_stack
cdef WeightedFlowPixelType flow_pixel

# properties of the parallel rasters
cdef unsigned int raster_x_size, raster_y_size
Expand Down Expand Up @@ -1552,6 +1583,29 @@ def flow_accumulation_d8(
if raw_weight_nodata is not None:
weight_nodata = raw_weight_nodata

cdef short do_decayed_accumulation = False
cdef short use_const_decay_factor = False
cdef float decay_factor = 1.0
cdef double decay_factor_nodata
if custom_decay_factor is not None:
do_decayed_accumulation = True
if isinstance(custom_decay_factor, (int, float)):
decay_factor = custom_decay_factor
use_const_decay_factor = True
else: # assume a path/band tuple
if not _is_raster_path_band_formatted(custom_decay_factor):
raise ValueError(
"%s is supposed to be a raster band tuple but it's not." % (
custom_decay_factor))
decay_factor_managed_raster = _ManagedRaster(
custom_decay_factor[0], custom_decay_factor[1], 0)
_tmp_decay_factor_nodata = pygeoprocessing.get_raster_info(
custom_decay_factor[0])['nodata'][0]
if _tmp_decay_factor_nodata is None:
decay_factor_nodata = float('nan')
else:
decay_factor_nodata = _tmp_decay_factor_nodata

flow_dir_raster_info = pygeoprocessing.get_raster_info(
flow_dir_raster_path_band[0])
raster_x_size, raster_y_size = flow_dir_raster_info['raster_size']
Expand All @@ -1564,6 +1618,8 @@ def flow_accumulation_d8(
flow_dir_nodata = tmp_flow_dir_nodata

# this outer loop searches for a pixel that is locally undrained
cdef queue[DecayingValue] decay_from_upstream
cdef double upstream_transport_factor = 1.0 # proportion of load transported to downstream pixel
for offset_dict in pygeoprocessing.iterblocks(
flow_dir_raster_path_band, offset_only=True, largest_block=0):
win_xsize = offset_dict['win_xsize']
Expand Down Expand Up @@ -1593,7 +1649,9 @@ def flow_accumulation_d8(
xi_n = -1
yi_n = -1

# search block for to set flow direction
# Search the block for root pixels.
# A root pixel is a flow direction pixel that is nodata, which means it
# may be a drain just off the edge.
for yi in range(1, win_ysize+1):
for xi in range(1, win_xsize+1):
flow_dir = flow_dir_buffer_array[yi, xi]
Expand All @@ -1615,51 +1673,98 @@ def flow_accumulation_d8(
else:
weight_val = 1.0
search_stack.push(
FlowPixelType(xi_root, yi_root, 0, weight_val))
WeightedFlowPixelType(xi_root, yi_root, 0, weight_val,
queue[DecayingValue]()))

# Drain the queue of upstream neighbors since we're starting
# from a new root pixel.
while not decay_from_upstream.empty():
decay_from_upstream.pop()

while not search_stack.empty():
flow_pixel = search_stack.top()
search_stack.pop()

preempted = 0
# Load any decaying values from upstream into the current flow pixel.
while not decay_from_upstream.empty():
upstream_decaying_value = decay_from_upstream.front()
decay_from_upstream.pop()
if upstream_decaying_value.decayed_value > upstream_decaying_value.min_value:
flow_pixel.decaying_values.push(upstream_decaying_value)

upstream_pixels_remain = 0
for i_n in range(flow_pixel.last_flow_dir, 8):
xi_n = flow_pixel.xi+D8_XOFFSET[i_n]
yi_n = flow_pixel.yi+D8_YOFFSET[i_n]
if (xi_n < 0 or xi_n >= raster_x_size or
yi_n < 0 or yi_n >= raster_y_size):
# no upstream here
# neighbor not upstream: off edges of the raster
continue
upstream_flow_dir = <int>flow_dir_managed_raster.get(
xi_n, yi_n)
if upstream_flow_dir == flow_dir_nodata or (
upstream_flow_dir !=
D8_REVERSE_DIRECTION[i_n]):
# no upstream here
upstream_flow_dir != D8_REVERSE_DIRECTION[i_n]):
# neighbor not upstream: is nodata or doesn't flow in
continue

upstream_flow_accum = <double>(
flow_accum_managed_raster.get(xi_n, yi_n))
if _is_close(upstream_flow_accum, flow_accum_nodata, 1e-8, 1e-5):
# process upstream before this one
flow_pixel.last_flow_dir = i_n
search_stack.push(flow_pixel)
# Flow accumulation pixel is nodata until it and everything
# upstream of it has been computed.
if weight_raster is not None:
weight_val = <double>weight_raster.get(
xi_n, yi_n)
if _is_close(weight_val, weight_nodata, 1e-8, 1e-5):
weight_val = 0.0
else:
weight_val = 1.0
if do_decayed_accumulation:
flow_pixel.decaying_values.push(
DecayingValue(weight_val,
weight_val * min_decay_proportion))

# process upstream pixel before this neighbor
flow_pixel.last_flow_dir = i_n
search_stack.push(flow_pixel)
search_stack.push(
FlowPixelType(xi_n, yi_n, 0, weight_val))
preempted = 1
WeightedFlowPixelType(xi_n, yi_n, 0, weight_val,
queue[DecayingValue]()))
upstream_pixels_remain = 1
break
flow_pixel.value += upstream_flow_accum
if not preempted:

if not do_decayed_accumulation:
flow_pixel.value += upstream_flow_accum

if not upstream_pixels_remain:
# flow_pixel.value already has the on-pixel load
# from upstream, so we just need to add it from the
# decaying values queue
if do_decayed_accumulation:
if use_const_decay_factor:
upstream_transport_factor = decay_factor
else:
upstream_transport_factor = (
decay_factor_managed_raster.get(flow_pixel.xi, flow_pixel.yi))
# If nodata, assume nothing will be transported from this pixel.
if _is_close(upstream_transport_factor, decay_factor_nodata, 1e-8, 1e-5):
upstream_transport_factor = 0.0

while not flow_pixel.decaying_values.empty():
decayed_value = flow_pixel.decaying_values.front()
flow_pixel.decaying_values.pop()
decayed_value.decayed_value *= upstream_transport_factor
flow_pixel.value += decayed_value.decayed_value
decay_from_upstream.push(decayed_value)

flow_accum_managed_raster.set(
flow_pixel.xi, flow_pixel.yi,
flow_pixel.value)
flow_accum_managed_raster.close()
flow_dir_managed_raster.close()
if do_decayed_accumulation:
if not use_const_decay_factor:
decay_factor_managed_raster.close()
if weight_raster is not None:
weight_raster.close()
LOGGER.info('Flow accumulation D8 %.1f%% complete', 100.0)
Expand Down
80 changes: 80 additions & 0 deletions tests/test_routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -1378,3 +1378,83 @@ def test_extract_streams_d8(self):
numpy.testing.assert_array_equal(
pygeoprocessing.raster_to_numpy_array(target_streams_path),
expected_array)

def test_flow_accum_d8_with_decay(self):
"""PGP.routing: test d8 flow accumulation with decay."""
# this was generated from a pre-calculated plateau drain dem
flow_dir_array = numpy.array([
[2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 0],
[4, 2, 2, 2, 2, 2, 2, 2, 2, 2, 0],
[4, 4, 2, 2, 2, 2, 2, 2, 2, 0, 0],
[4, 4, 4, 2, 2, 2, 2, 2, 0, 0, 0],
[4, 4, 4, 4, 2, 2, 2, 0, 0, 0, 0],
[4, 4, 4, 4, 4, 2, 0, 0, 0, 0, 0],
[4, 4, 4, 4, 4, 6, 0, 0, 0, 0, 0],
[4, 4, 4, 4, 6, 6, 6, 0, 0, 0, 0],
[4, 4, 4, 6, 6, 6, 6, 6, 0, 0, 0],
[4, 4, 6, 6, 6, 6, 6, 6, 6, 0, 0],
[4, 6, 6, 6, 6, 6, 6, 6, 6, 6, 0]], dtype=numpy.uint8)

flow_dir_path = os.path.join(self.workspace_dir, 'flow_dir.tif')
_array_to_raster(flow_dir_array, None, flow_dir_path)

target_flow_accum_path = os.path.join(
self.workspace_dir, 'flow_accum.tif')

# Test with scalar decay factor and also with a raster of scalar values
const_decay_factor = 0.5
decay_factor_path = os.path.join(
self.workspace_dir, 'decay_factor.tif')
decay_array = numpy.full(flow_dir_array.shape, const_decay_factor,
dtype=numpy.float32)
_array_to_raster(decay_array, None, decay_factor_path)

for decay_factor in (const_decay_factor, (decay_factor_path, 1)):
pygeoprocessing.routing.flow_accumulation_d8(
(flow_dir_path, 1), target_flow_accum_path,
custom_decay_factor=decay_factor)

flow_accum_array = pygeoprocessing.raster_to_numpy_array(
target_flow_accum_path)
self.assertEqual(flow_accum_array.dtype, numpy.float64)

# This array is a regression result saved by hand, but
# because this flow accumulation doesn't have any joining flow
# paths we can calculate weighted flow accumulation with the
# closed form of the summation:
# decayed_accum = 2 - decay_factor ** (flow_accum - 1)
expected_result = 2 - const_decay_factor ** (numpy.array(
[[1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1],
[1, 1, 2, 3, 4, 5, 4, 3, 2, 1, 1],
[2, 1, 1, 2, 3, 4, 3, 2, 1, 1, 2],
[3, 2, 1, 1, 2, 3, 2, 1, 1, 2, 3],
[4, 3, 2, 1, 1, 2, 1, 1, 2, 3, 4],
[5, 4, 3, 2, 1, 1, 1, 2, 3, 4, 5],
[5, 4, 3, 2, 1, 1, 1, 2, 3, 4, 5],
[4, 3, 2, 1, 1, 2, 1, 1, 2, 3, 4],
[3, 2, 1, 1, 2, 3, 2, 1, 1, 2, 3],
[2, 1, 1, 2, 3, 4, 3, 2, 1, 1, 2],
[1, 1, 2, 3, 4, 5, 4, 3, 2, 1, 1]]) - 1)

numpy.testing.assert_almost_equal(
flow_accum_array, expected_result)

def test_flow_accum_with_decay_merging_flow(self):
"""PGP.routing: test d8 flow accum with decay and merged flowpath."""
flow_dir_path = os.path.join(self.workspace_dir, 'flow_dir.tif')
_array_to_raster(
numpy.array([
[255, 0, 0, 0, 0],
[255, 0, 0, 0, 2]], dtype=numpy.uint8), 255, flow_dir_path)

flow_accum_path = os.path.join(self.workspace_dir, 'flow_accum.tif')
pygeoprocessing.routing.flow_accumulation_d8(
(flow_dir_path, 1), flow_accum_path, custom_decay_factor=0.5)

fnodata = -1.23789789e29 # copied from routing.pyx
expected_array = numpy.array([
[fnodata, 1, 1.5, 1.75, 2.8125],
[fnodata, 1, 1.5, 1.75, 1.875]], dtype=numpy.float64)
numpy.testing.assert_allclose(
pygeoprocessing.raster_to_numpy_array(flow_accum_path),
expected_array)

0 comments on commit 5e9fc84

Please sign in to comment.