Skip to content

Commit

Permalink
Pass scratch directory to underlying unwrapper (isce-framework#39)
Browse files Browse the repository at this point in the history
`tophu.multiscale_unwrap()` may be passed the path of a scratch
directory in which to put its intermediate outputs. Previously, it did
not communicate this to the underlying unwrap callback functor, though,
so the unwrapper would just put its scratch outputs in the default
temporary directory.

This is undesirable because:

* The user may want control over where all scratch outputs are written
  in order to avoid writing to drives with low bandwidth or low
  available disk space.
* The user may want intermediate outputs to persist in a known location
  after unwrapping for debugging purposes.

This update causes the scratch directory location to be propagated to
the underlying unwrapper in order to address this issue.
  • Loading branch information
gmgunter authored Oct 10, 2023
1 parent 66df96f commit 66ebd64
Show file tree
Hide file tree
Showing 4 changed files with 301 additions and 110 deletions.
133 changes: 96 additions & 37 deletions src/tophu/_multiscale.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

import os
import warnings
from tempfile import NamedTemporaryFile
from pathlib import Path
from tempfile import NamedTemporaryFile, mkdtemp

import dask.array as da
import numpy as np
Expand Down Expand Up @@ -204,13 +205,63 @@ def check_congruence(
return wrapped_hires + 2.0 * np.pi * diff_cycles_hires


def unwrap_in_tmpdir(
igram: NDArray[np.complexfloating],
coherence: NDArray[np.floating],
*,
unwrap_func: UnwrapCallback,
nlooks: float,
dir: Path,
) -> tuple[NDArray[np.floating], NDArray[np.unsignedinteger]]:
"""
Perform phase unwrapping in a uniquely-named subdirectory.
This function acts as a wrapper around `unwrap_func`. It creates a new unique
subdirectory in `dir` to be used as a scratch directory for phase unwrapping. As
long as `unwrap_func` only writes to the scratch directory, data races will not
occur when `unwrap_in_tmpdir` is executed in parallel.
The scratch directory does not get automatically cleaned up after processing.
Parameters
----------
igram : numpy.ndarray
Input interferogram.
coherence : numpy.ndarray
Sample coherence coefficient, normalized to the interval [0, 1], with the
same shape as the input interferogram.
unwrap_func : UnwrapCallback
A callback function used to unwrap the interferogram.
nlooks : float
Effective number of spatial looks used to form the input coherence data.
dir : pathlib.Path
Parent directory to create the scratch directory inside. Must be an existing
file system directory.
Returns
-------
unwphase : numpy.ndarray
Unwrapped phase, in radians.
conncomp : numpy.ndarray
Connected component labels, with the same shape as the unwrapped phase.
"""
scratchdir = Path(mkdtemp(dir=os.fspath(dir)))
return unwrap_func(
igram=igram,
coherence=coherence,
nlooks=nlooks,
scratchdir=scratchdir,
)


def coarse_unwrap(
igram: da.Array,
coherence: da.Array,
nlooks: float,
unwrap_func: UnwrapCallback,
downsample_factor: tuple[int, int],
*,
scratchdir: Path,
do_lowpass_filter: bool = True,
shape_factor: float = 1.5,
overhang: float = 0.5,
Expand Down Expand Up @@ -242,6 +293,9 @@ def coarse_unwrap(
downsample_factor : tuple of int
The number of looks to take along each axis in order to form the low-resolution
interferogram.
scratchdir : pathlib.Path
Scratch directory where intermediate processing artifacts may be written. Must
be an existing file system directory.
do_lowpass_filter : bool, optional
If True, apply a low-pass pre-filter prior to multilooking in order to reduce
aliasing effects. Defaults to True.
Expand Down Expand Up @@ -329,10 +383,12 @@ def to_single_chunk(arr: ArrayLike) -> da.Array:

# Unwrap the downsampled data.
unwrapped_lores, conncomp_lores = _util.map_blocks(
unwrap_func,
unwrap_in_tmpdir,
igram_lores_singleblock,
coherence_lores_singleblock,
unwrap_func=unwrap_func,
nlooks=nlooks_lores,
dir=scratchdir,
meta=(np.empty((), dtype=np.float32), np.empty((), dtype=np.uint32)),
)

Expand Down Expand Up @@ -582,45 +638,48 @@ def multiscale_unwrap(
igram_ = da.from_array(igram, chunks=chunksize, asarray=True)
coherence_ = da.from_array(coherence, chunks=chunksize, asarray=True)

# Get a coarse estimate of the unwrapped phase using a low-resolution copy of the
# interferogram.
coarse_unwrapped, coarse_conncomp = coarse_unwrap(
igram=igram_,
coherence=coherence_,
nlooks=nlooks,
unwrap_func=unwrap_func,
downsample_factor=downsample_factor,
do_lowpass_filter=do_lowpass_filter,
shape_factor=shape_factor,
overhang=overhang,
ripple=ripple,
attenuation=attenuation,
)

# Unwrap each (high-resolution) tile independently.
tiled_unwrapped, tiled_conncomp = _util.map_blocks(
unwrap_func,
igram_,
coherence_,
nlooks=nlooks,
meta=(np.empty((), dtype=np.float32), np.empty((), dtype=np.uint32)),
)

# Add or subtract multiples of 2pi to each connected component to minimize the mean
# discrepancy between the high-res and low-res unwrapped phase (in order to correct
# for phase discontinuities between adjacent tiles).
multiscale_unwrapped = da.map_blocks(
adjust_conncomp_offset_cycles,
tiled_unwrapped,
tiled_conncomp,
coarse_unwrapped,
coarse_conncomp,
)

# Create the scratch directory if it didn't exist. If no scratch directory was
# supplied, create a temporary directory that will be cleaned up automatically when
# the context block is exited.
with _util.scratch_directory(scratchdir) as d:
# Get a coarse estimate of the unwrapped phase using a low-resolution copy of
# the interferogram.
coarse_unwrapped, coarse_conncomp = coarse_unwrap(
igram=igram_,
coherence=coherence_,
nlooks=nlooks,
unwrap_func=unwrap_func,
downsample_factor=downsample_factor,
scratchdir=d,
do_lowpass_filter=do_lowpass_filter,
shape_factor=shape_factor,
overhang=overhang,
ripple=ripple,
attenuation=attenuation,
)

# Unwrap each (high-resolution) tile independently.
tiled_unwrapped, tiled_conncomp = _util.map_blocks(
unwrap_in_tmpdir,
igram_,
coherence_,
unwrap_func=unwrap_func,
nlooks=nlooks,
dir=d,
meta=(np.empty((), dtype=np.float32), np.empty((), dtype=np.uint32)),
)

# Add or subtract multiples of 2pi to each connected component to minimize the
# mean discrepancy between the high-res and low-res unwrapped phase (in order to
# correct for phase discontinuities between adjacent tiles).
multiscale_unwrapped = da.map_blocks(
adjust_conncomp_offset_cycles,
tiled_unwrapped,
tiled_conncomp,
coarse_unwrapped,
coarse_conncomp,
)

# Create temporary files to store the intermediate connected component labels
# from coarse & tiled unwrapping.
coarse_conncomp_tmpfile = unique_binary_file(
Expand Down
138 changes: 68 additions & 70 deletions src/tophu/_unwrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import warnings
from os import PathLike
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Literal, Protocol, runtime_checkable

import isce3
Expand Down Expand Up @@ -36,6 +35,7 @@ def __call__(
igram: NDArray[np.complexfloating],
coherence: NDArray[np.floating],
nlooks: float,
scratchdir: Path,
) -> tuple[NDArray[np.floating], NDArray[np.unsignedinteger]]:
"""
Perform two-dimensional phase unwrapping.
Expand All @@ -49,6 +49,8 @@ def __call__(
same shape as the input interferogram.
nlooks : float
Effective number of spatial looks used to form the input coherence data.
scratchdir : pathlib.Path
Scratch directory where intermediate processing artifacts may be written.
Returns
-------
Expand Down Expand Up @@ -112,6 +114,7 @@ def __call__(
igram: NDArray[np.complexfloating],
coherence: NDArray[np.floating],
nlooks: float,
scratchdir: Path,
) -> tuple[NDArray[np.floating], NDArray[np.unsignedinteger]]:
# Convert input arrays to GDAL rasters with the expected datatypes.
igram_data = np.asanyarray(igram, dtype=np.complex64)
Expand All @@ -137,6 +140,7 @@ def __call__(
cost=self.cost,
cost_params=self.cost_params,
init_method=self.init_method,
scratchdir=scratchdir,
)

return unwphase, conncomp
Expand Down Expand Up @@ -396,6 +400,7 @@ def __call__(
igram: NDArray[np.complexfloating],
coherence: NDArray[np.floating],
nlooks: float,
scratchdir: Path,
) -> tuple[NDArray[np.floating], NDArray[np.unsignedinteger]]:
# Configure ICU to unwrap the interferogram as a single tile (no bootstrapping).
icu = isce3.unwrap.ICU(
Expand All @@ -412,38 +417,34 @@ def __call__(
min_cc_area=self.min_conncomp_area_frac,
)

# Create a temporary scratch directory to store intermediate rasters.
with TemporaryDirectory() as tmpdir:
d = Path(tmpdir)

# Convert input arrays into rasters.
igram_raster = to_geotiff(d / "igram.tif", igram)
coherence_raster = to_geotiff(d / "coherence.tif", coherence)

# Create zero-filled rasters to store output data.
unwphase_raster = create_geotiff(
d / "unwphase.tif",
width=igram.shape[1],
length=igram.shape[0],
dtype=np.float32,
)
conncomp_raster = create_geotiff(
d / "conncomp.tif",
width=igram.shape[1],
length=igram.shape[0],
dtype=np.uint8,
)

# Run ICU.
icu.unwrap(unwphase_raster, conncomp_raster, igram_raster, coherence_raster)

# Ensure changes to output rasters are flushed to disk and close the files.
del unwphase_raster
del conncomp_raster

# Read output rasters into in-memory arrays.
unwphase = read_raster(d / "unwphase.tif")
conncomp = read_raster(d / "conncomp.tif")
# Convert input arrays into rasters.
igram_raster = to_geotiff(scratchdir / "igram.tif", igram)
coherence_raster = to_geotiff(scratchdir / "coherence.tif", coherence)

# Create zero-filled rasters to store output data.
unwphase_raster = create_geotiff(
scratchdir / "unwphase.tif",
width=igram.shape[1],
length=igram.shape[0],
dtype=np.float32,
)
conncomp_raster = create_geotiff(
scratchdir / "conncomp.tif",
width=igram.shape[1],
length=igram.shape[0],
dtype=np.uint8,
)

# Run ICU.
icu.unwrap(unwphase_raster, conncomp_raster, igram_raster, coherence_raster)

# Ensure changes to output rasters are flushed to disk and close the files.
del unwphase_raster
del conncomp_raster

# Read output rasters into in-memory arrays.
unwphase = read_raster(scratchdir / "unwphase.tif")
conncomp = read_raster(scratchdir / "conncomp.tif")

return unwphase, conncomp

Expand Down Expand Up @@ -490,6 +491,7 @@ def __call__(
igram: NDArray[np.complexfloating],
coherence: NDArray[np.floating],
nlooks: float,
scratchdir: Path,
) -> tuple[NDArray[np.floating], NDArray[np.unsignedinteger]]:
# Configure ICU to unwrap the interferogram as a single tile (no bootstrapping).
phass = isce3.unwrap.Phass(
Expand All @@ -501,42 +503,38 @@ def __call__(
# Get wrapped phase.
wphase = np.angle(igram)

# Create a temporary scratch directory to store intermediate rasters.
with TemporaryDirectory() as tmpdir:
d = Path(tmpdir)

# Convert input arrays into rasters.
wphase_raster = to_geotiff(d / "wphase.tif", wphase)
coherence_raster = to_geotiff(d / "coherence.tif", coherence)

# Create zero-filled rasters to store output data.
unwphase_raster = create_geotiff(
d / "unwphase.tif",
width=igram.shape[1],
length=igram.shape[0],
dtype=np.float32,
)
conncomp_raster = create_geotiff(
d / "conncomp.tif",
width=igram.shape[1],
length=igram.shape[0],
dtype=np.uint32,
)

# Run PHASS.
phass.unwrap(
wphase_raster,
coherence_raster,
unwphase_raster,
conncomp_raster,
)

# Ensure changes to output rasters are flushed to disk and close the files.
del unwphase_raster
del conncomp_raster

# Read output rasters into in-memory arrays.
unwphase = read_raster(d / "unwphase.tif")
conncomp = read_raster(d / "conncomp.tif")
# Convert input arrays into rasters.
wphase_raster = to_geotiff(scratchdir / "wphase.tif", wphase)
coherence_raster = to_geotiff(scratchdir / "coherence.tif", coherence)

# Create zero-filled rasters to store output data.
unwphase_raster = create_geotiff(
scratchdir / "unwphase.tif",
width=igram.shape[1],
length=igram.shape[0],
dtype=np.float32,
)
conncomp_raster = create_geotiff(
scratchdir / "conncomp.tif",
width=igram.shape[1],
length=igram.shape[0],
dtype=np.uint32,
)

# Run PHASS.
phass.unwrap(
wphase_raster,
coherence_raster,
unwphase_raster,
conncomp_raster,
)

# Ensure changes to output rasters are flushed to disk and close the files.
del unwphase_raster
del conncomp_raster

# Read output rasters into in-memory arrays.
unwphase = read_raster(scratchdir / "unwphase.tif")
conncomp = read_raster(scratchdir / "conncomp.tif")

return unwphase, conncomp
Loading

0 comments on commit 66ebd64

Please sign in to comment.