From 66ebd64a82556c22f8100b733efcff37b33456b6 Mon Sep 17 00:00:00 2001 From: Geoffrey Gunter <12984092+gmgunter@users.noreply.github.com> Date: Mon, 9 Oct 2023 18:38:09 -0700 Subject: [PATCH] Pass scratch directory to underlying unwrapper (#39) `tophu.multiscale_unwrap()` may be passed the path of a scratch directory in which to put its intermediate outputs. Previously, it did not communicate this to the underlying unwrap callback functor, though, so the unwrapper would just put its scratch outputs in the default temporary directory. This is undesirable because: * The user may want control over where all scratch outputs are written in order to avoid writing to drives with low bandwidth or low available disk space. * The user may want intermediate outputs to persist in a known location after unwrapping for debugging purposes. This update causes the scratch directory location to be propagated to the underlying unwrapper in order to address this issue. --- src/tophu/_multiscale.py | 133 ++++++++++++++++++++++++++----------- src/tophu/_unwrap.py | 138 +++++++++++++++++++-------------------- test/test_multiscale.py | 52 +++++++++++++++ test/test_unwrap.py | 88 ++++++++++++++++++++++++- 4 files changed, 301 insertions(+), 110 deletions(-) diff --git a/src/tophu/_multiscale.py b/src/tophu/_multiscale.py index 9cd9619..50b12ea 100644 --- a/src/tophu/_multiscale.py +++ b/src/tophu/_multiscale.py @@ -2,7 +2,8 @@ import os import warnings -from tempfile import NamedTemporaryFile +from pathlib import Path +from tempfile import NamedTemporaryFile, mkdtemp import dask.array as da import numpy as np @@ -204,6 +205,55 @@ def check_congruence( return wrapped_hires + 2.0 * np.pi * diff_cycles_hires +def unwrap_in_tmpdir( + igram: NDArray[np.complexfloating], + coherence: NDArray[np.floating], + *, + unwrap_func: UnwrapCallback, + nlooks: float, + dir: Path, +) -> tuple[NDArray[np.floating], NDArray[np.unsignedinteger]]: + """ + Perform phase unwrapping in a uniquely-named subdirectory. + + This function acts as a wrapper around `unwrap_func`. It creates a new unique + subdirectory in `dir` to be used as a scratch directory for phase unwrapping. As + long as `unwrap_func` only writes to the scratch directory, data races will not + occur when `unwrap_in_tmpdir` is executed in parallel. + + The scratch directory does not get automatically cleaned up after processing. + + Parameters + ---------- + igram : numpy.ndarray + Input interferogram. + coherence : numpy.ndarray + Sample coherence coefficient, normalized to the interval [0, 1], with the + same shape as the input interferogram. + unwrap_func : UnwrapCallback + A callback function used to unwrap the interferogram. + nlooks : float + Effective number of spatial looks used to form the input coherence data. + dir : pathlib.Path + Parent directory to create the scratch directory inside. Must be an existing + file system directory. + + Returns + ------- + unwphase : numpy.ndarray + Unwrapped phase, in radians. + conncomp : numpy.ndarray + Connected component labels, with the same shape as the unwrapped phase. + """ + scratchdir = Path(mkdtemp(dir=os.fspath(dir))) + return unwrap_func( + igram=igram, + coherence=coherence, + nlooks=nlooks, + scratchdir=scratchdir, + ) + + def coarse_unwrap( igram: da.Array, coherence: da.Array, @@ -211,6 +261,7 @@ def coarse_unwrap( unwrap_func: UnwrapCallback, downsample_factor: tuple[int, int], *, + scratchdir: Path, do_lowpass_filter: bool = True, shape_factor: float = 1.5, overhang: float = 0.5, @@ -242,6 +293,9 @@ def coarse_unwrap( downsample_factor : tuple of int The number of looks to take along each axis in order to form the low-resolution interferogram. + scratchdir : pathlib.Path + Scratch directory where intermediate processing artifacts may be written. Must + be an existing file system directory. do_lowpass_filter : bool, optional If True, apply a low-pass pre-filter prior to multilooking in order to reduce aliasing effects. Defaults to True. @@ -329,10 +383,12 @@ def to_single_chunk(arr: ArrayLike) -> da.Array: # Unwrap the downsampled data. unwrapped_lores, conncomp_lores = _util.map_blocks( - unwrap_func, + unwrap_in_tmpdir, igram_lores_singleblock, coherence_lores_singleblock, + unwrap_func=unwrap_func, nlooks=nlooks_lores, + dir=scratchdir, meta=(np.empty((), dtype=np.float32), np.empty((), dtype=np.uint32)), ) @@ -582,45 +638,48 @@ def multiscale_unwrap( igram_ = da.from_array(igram, chunks=chunksize, asarray=True) coherence_ = da.from_array(coherence, chunks=chunksize, asarray=True) - # Get a coarse estimate of the unwrapped phase using a low-resolution copy of the - # interferogram. - coarse_unwrapped, coarse_conncomp = coarse_unwrap( - igram=igram_, - coherence=coherence_, - nlooks=nlooks, - unwrap_func=unwrap_func, - downsample_factor=downsample_factor, - do_lowpass_filter=do_lowpass_filter, - shape_factor=shape_factor, - overhang=overhang, - ripple=ripple, - attenuation=attenuation, - ) - - # Unwrap each (high-resolution) tile independently. - tiled_unwrapped, tiled_conncomp = _util.map_blocks( - unwrap_func, - igram_, - coherence_, - nlooks=nlooks, - meta=(np.empty((), dtype=np.float32), np.empty((), dtype=np.uint32)), - ) - - # Add or subtract multiples of 2pi to each connected component to minimize the mean - # discrepancy between the high-res and low-res unwrapped phase (in order to correct - # for phase discontinuities between adjacent tiles). - multiscale_unwrapped = da.map_blocks( - adjust_conncomp_offset_cycles, - tiled_unwrapped, - tiled_conncomp, - coarse_unwrapped, - coarse_conncomp, - ) - # Create the scratch directory if it didn't exist. If no scratch directory was # supplied, create a temporary directory that will be cleaned up automatically when # the context block is exited. with _util.scratch_directory(scratchdir) as d: + # Get a coarse estimate of the unwrapped phase using a low-resolution copy of + # the interferogram. + coarse_unwrapped, coarse_conncomp = coarse_unwrap( + igram=igram_, + coherence=coherence_, + nlooks=nlooks, + unwrap_func=unwrap_func, + downsample_factor=downsample_factor, + scratchdir=d, + do_lowpass_filter=do_lowpass_filter, + shape_factor=shape_factor, + overhang=overhang, + ripple=ripple, + attenuation=attenuation, + ) + + # Unwrap each (high-resolution) tile independently. + tiled_unwrapped, tiled_conncomp = _util.map_blocks( + unwrap_in_tmpdir, + igram_, + coherence_, + unwrap_func=unwrap_func, + nlooks=nlooks, + dir=d, + meta=(np.empty((), dtype=np.float32), np.empty((), dtype=np.uint32)), + ) + + # Add or subtract multiples of 2pi to each connected component to minimize the + # mean discrepancy between the high-res and low-res unwrapped phase (in order to + # correct for phase discontinuities between adjacent tiles). + multiscale_unwrapped = da.map_blocks( + adjust_conncomp_offset_cycles, + tiled_unwrapped, + tiled_conncomp, + coarse_unwrapped, + coarse_conncomp, + ) + # Create temporary files to store the intermediate connected component labels # from coarse & tiled unwrapping. coarse_conncomp_tmpfile = unique_binary_file( diff --git a/src/tophu/_unwrap.py b/src/tophu/_unwrap.py index d4d065d..209c15c 100644 --- a/src/tophu/_unwrap.py +++ b/src/tophu/_unwrap.py @@ -4,7 +4,6 @@ import warnings from os import PathLike from pathlib import Path -from tempfile import TemporaryDirectory from typing import Literal, Protocol, runtime_checkable import isce3 @@ -36,6 +35,7 @@ def __call__( igram: NDArray[np.complexfloating], coherence: NDArray[np.floating], nlooks: float, + scratchdir: Path, ) -> tuple[NDArray[np.floating], NDArray[np.unsignedinteger]]: """ Perform two-dimensional phase unwrapping. @@ -49,6 +49,8 @@ def __call__( same shape as the input interferogram. nlooks : float Effective number of spatial looks used to form the input coherence data. + scratchdir : pathlib.Path + Scratch directory where intermediate processing artifacts may be written. Returns ------- @@ -112,6 +114,7 @@ def __call__( igram: NDArray[np.complexfloating], coherence: NDArray[np.floating], nlooks: float, + scratchdir: Path, ) -> tuple[NDArray[np.floating], NDArray[np.unsignedinteger]]: # Convert input arrays to GDAL rasters with the expected datatypes. igram_data = np.asanyarray(igram, dtype=np.complex64) @@ -137,6 +140,7 @@ def __call__( cost=self.cost, cost_params=self.cost_params, init_method=self.init_method, + scratchdir=scratchdir, ) return unwphase, conncomp @@ -396,6 +400,7 @@ def __call__( igram: NDArray[np.complexfloating], coherence: NDArray[np.floating], nlooks: float, + scratchdir: Path, ) -> tuple[NDArray[np.floating], NDArray[np.unsignedinteger]]: # Configure ICU to unwrap the interferogram as a single tile (no bootstrapping). icu = isce3.unwrap.ICU( @@ -412,38 +417,34 @@ def __call__( min_cc_area=self.min_conncomp_area_frac, ) - # Create a temporary scratch directory to store intermediate rasters. - with TemporaryDirectory() as tmpdir: - d = Path(tmpdir) - - # Convert input arrays into rasters. - igram_raster = to_geotiff(d / "igram.tif", igram) - coherence_raster = to_geotiff(d / "coherence.tif", coherence) - - # Create zero-filled rasters to store output data. - unwphase_raster = create_geotiff( - d / "unwphase.tif", - width=igram.shape[1], - length=igram.shape[0], - dtype=np.float32, - ) - conncomp_raster = create_geotiff( - d / "conncomp.tif", - width=igram.shape[1], - length=igram.shape[0], - dtype=np.uint8, - ) - - # Run ICU. - icu.unwrap(unwphase_raster, conncomp_raster, igram_raster, coherence_raster) - - # Ensure changes to output rasters are flushed to disk and close the files. - del unwphase_raster - del conncomp_raster - - # Read output rasters into in-memory arrays. - unwphase = read_raster(d / "unwphase.tif") - conncomp = read_raster(d / "conncomp.tif") + # Convert input arrays into rasters. + igram_raster = to_geotiff(scratchdir / "igram.tif", igram) + coherence_raster = to_geotiff(scratchdir / "coherence.tif", coherence) + + # Create zero-filled rasters to store output data. + unwphase_raster = create_geotiff( + scratchdir / "unwphase.tif", + width=igram.shape[1], + length=igram.shape[0], + dtype=np.float32, + ) + conncomp_raster = create_geotiff( + scratchdir / "conncomp.tif", + width=igram.shape[1], + length=igram.shape[0], + dtype=np.uint8, + ) + + # Run ICU. + icu.unwrap(unwphase_raster, conncomp_raster, igram_raster, coherence_raster) + + # Ensure changes to output rasters are flushed to disk and close the files. + del unwphase_raster + del conncomp_raster + + # Read output rasters into in-memory arrays. + unwphase = read_raster(scratchdir / "unwphase.tif") + conncomp = read_raster(scratchdir / "conncomp.tif") return unwphase, conncomp @@ -490,6 +491,7 @@ def __call__( igram: NDArray[np.complexfloating], coherence: NDArray[np.floating], nlooks: float, + scratchdir: Path, ) -> tuple[NDArray[np.floating], NDArray[np.unsignedinteger]]: # Configure ICU to unwrap the interferogram as a single tile (no bootstrapping). phass = isce3.unwrap.Phass( @@ -501,42 +503,38 @@ def __call__( # Get wrapped phase. wphase = np.angle(igram) - # Create a temporary scratch directory to store intermediate rasters. - with TemporaryDirectory() as tmpdir: - d = Path(tmpdir) - - # Convert input arrays into rasters. - wphase_raster = to_geotiff(d / "wphase.tif", wphase) - coherence_raster = to_geotiff(d / "coherence.tif", coherence) - - # Create zero-filled rasters to store output data. - unwphase_raster = create_geotiff( - d / "unwphase.tif", - width=igram.shape[1], - length=igram.shape[0], - dtype=np.float32, - ) - conncomp_raster = create_geotiff( - d / "conncomp.tif", - width=igram.shape[1], - length=igram.shape[0], - dtype=np.uint32, - ) - - # Run PHASS. - phass.unwrap( - wphase_raster, - coherence_raster, - unwphase_raster, - conncomp_raster, - ) - - # Ensure changes to output rasters are flushed to disk and close the files. - del unwphase_raster - del conncomp_raster - - # Read output rasters into in-memory arrays. - unwphase = read_raster(d / "unwphase.tif") - conncomp = read_raster(d / "conncomp.tif") + # Convert input arrays into rasters. + wphase_raster = to_geotiff(scratchdir / "wphase.tif", wphase) + coherence_raster = to_geotiff(scratchdir / "coherence.tif", coherence) + + # Create zero-filled rasters to store output data. + unwphase_raster = create_geotiff( + scratchdir / "unwphase.tif", + width=igram.shape[1], + length=igram.shape[0], + dtype=np.float32, + ) + conncomp_raster = create_geotiff( + scratchdir / "conncomp.tif", + width=igram.shape[1], + length=igram.shape[0], + dtype=np.uint32, + ) + + # Run PHASS. + phass.unwrap( + wphase_raster, + coherence_raster, + unwphase_raster, + conncomp_raster, + ) + + # Ensure changes to output rasters are flushed to disk and close the files. + del unwphase_raster + del conncomp_raster + + # Read output rasters into in-memory arrays. + unwphase = read_raster(scratchdir / "unwphase.tif") + conncomp = read_raster(scratchdir / "conncomp.tif") return unwphase, conncomp diff --git a/test/test_multiscale.py b/test/test_multiscale.py index 493f4d6..a73d179 100644 --- a/test/test_multiscale.py +++ b/test/test_multiscale.py @@ -1,5 +1,9 @@ from __future__ import annotations +import os +from pathlib import Path +from tempfile import TemporaryDirectory + import numpy as np import pytest from numpy.typing import ArrayLike, NDArray @@ -68,6 +72,28 @@ def jaccard_similarity(a: ArrayLike, b: ArrayLike) -> float: return np.sum(a & b) / np.sum(a | b) +def list_subdirs(dir: str | os.PathLike) -> list[Path]: + """ + List all immediate subdirectories of the specified directory. + + Parameters + ---------- + dir : path-like + The parent directory. Must be an existing file system directory. + + Returns + ------- + subdirs : list of pathlib.Path + All subdirectories of `dir`. + """ + dir = Path(dir) + + if not dir.is_dir(): + raise ValueError(f"'{dir}' is not an existing directory.") + + return [d for d in dir.iterdir() if d.is_dir()] + + class TestMultiScaleUnwrap: @pytest.mark.parametrize("length,width", [(1023, 1023), (1024, 1024)]) @pytest.mark.parametrize("unwrap_func", UNWRAP_FUNCS) @@ -477,3 +503,29 @@ def test_bad_overhang(self, overhang: float): ntiles=(2, 2), overhang=overhang, ) + + def test_scratchdir(self): + unwrapped, conncomp, igram, coherence = dummy_inputs_and_outputs() + + with TemporaryDirectory() as d: + scratchdir = Path(d) + + tophu.multiscale_unwrap( + unwrapped, + conncomp, + igram, + coherence, + nlooks=1.0, + unwrap_func=tophu.SnaphuUnwrap(), + downsample_factor=(3, 3), + ntiles=(2, 2), + scratchdir=scratchdir, + ) + + # Check that the scratch directory still exists. + assert scratchdir.is_dir() + + # The scratch directory should contain 5 unique subdirectories -- one for + # each tile and one for the coarse unwrapping step. + subdirs = list_subdirs(scratchdir) + assert len(subdirs) == 5 diff --git a/test/test_unwrap.py b/test/test_unwrap.py index 5ee8f69..f251628 100644 --- a/test/test_unwrap.py +++ b/test/test_unwrap.py @@ -1,3 +1,6 @@ +from pathlib import Path +from tempfile import TemporaryDirectory + import numpy as np import pytest from isce3.unwrap import snaphu @@ -100,7 +103,9 @@ def test_unwrap(self): igram = np.exp(1j * phase) # Unwrap. - unw, conncomp = unwrap(igram, coherence, nlooks) + with TemporaryDirectory() as d: + scratchdir = Path(d) + unw, conncomp = unwrap(igram, coherence, nlooks, scratchdir) # Get a mask of valid pixels (pixels that were assigned to some connected # component). @@ -123,6 +128,31 @@ def test_unwrap(self): assert unique_labels == {1} assert frac_nonzero(conncomp) > 0.95 + def test_scratchdir(self): + # Create dummy interferogram & coherence data. + length, width = 256, 256 + igram = np.zeros((length, width), dtype=np.complex64) + coherence = np.ones((length, width), dtype=np.float32) + + # Init unwrap callback functor. + unwrap = tophu.SnaphuUnwrap() + + with TemporaryDirectory() as d: + scratchdir = Path(d) + + # Unwrap. + unwrap(igram, coherence, 1.0, scratchdir) + + # Check directory contents. + for filename in [ + "snaphu.conf", + "igram.c8", + "corr.f4", + "unw.f4", + "conncomp.u4", + ]: + assert (scratchdir / filename).is_file() + class TestICUUnwrap: def test_interface(self): @@ -207,7 +237,9 @@ def test_unwrap(self): unwrap = tophu.ICUUnwrap(min_coherence=0.5) # Unwrap. - unw, conncomp = unwrap(igram, coherence, nlooks) + with TemporaryDirectory() as d: + scratchdir = Path(d) + unw, conncomp = unwrap(igram, coherence, nlooks, scratchdir) # Check the number of unique nonzero connected component labels. unique_labels = set(np.unique(conncomp)) @@ -239,6 +271,30 @@ def test_unwrap(self): good_pixels = np.isclose(unw[mask] + off, phase[mask], rtol=1e-5, atol=1e-5) assert frac_nonzero(good_pixels) > 0.95 + def test_scratchdir(self): + # Create dummy interferogram & coherence data. + length, width = 256, 256 + igram = np.zeros((length, width), dtype=np.complex64) + coherence = np.ones((length, width), dtype=np.float32) + + # Init unwrap callback functor. + unwrap = tophu.ICUUnwrap() + + with TemporaryDirectory() as d: + scratchdir = Path(d) + + # Unwrap. + unwrap(igram, coherence, 1.0, scratchdir) + + # Check directory contents. + for filename in [ + "igram.tif", + "coherence.tif", + "unwphase.tif", + "conncomp.tif", + ]: + assert (scratchdir / filename).is_file() + class TestPhassUnwrap: def test_interface(self): @@ -292,7 +348,9 @@ def test_unwrap(self): unwrap = tophu.PhassUnwrap(coherence_thresh=0.5) # Unwrap. - unw, conncomp = unwrap(igram, coherence, nlooks) + with TemporaryDirectory() as d: + scratchdir = Path(d) + unw, conncomp = unwrap(igram, coherence, nlooks, scratchdir) # Check the number of unique nonzero connected component labels. unique_labels = set(np.unique(conncomp)) @@ -323,3 +381,27 @@ def test_unwrap(self): off = round_to_nearest(np.mean(phasediff[mask]), 2.0 * np.pi) good_pixels = np.isclose(unw[mask] + off, phase[mask], rtol=1e-5, atol=1e-5) assert frac_nonzero(good_pixels) > 0.95 + + def test_scratchdir(self): + # Create dummy interferogram & coherence data. + length, width = 256, 256 + igram = np.zeros((length, width), dtype=np.complex64) + coherence = np.ones((length, width), dtype=np.float32) + + # Init unwrap callback functor. + unwrap = tophu.PhassUnwrap() + + with TemporaryDirectory() as d: + scratchdir = Path(d) + + # Unwrap. + unwrap(igram, coherence, 1.0, scratchdir) + + # Check directory contents. + for filename in [ + "wphase.tif", + "coherence.tif", + "unwphase.tif", + "conncomp.tif", + ]: + assert (scratchdir / filename).is_file()