diff --git a/src/tophu/_multiscale.py b/src/tophu/_multiscale.py index 9cd9619..50b12ea 100644 --- a/src/tophu/_multiscale.py +++ b/src/tophu/_multiscale.py @@ -2,7 +2,8 @@ import os import warnings -from tempfile import NamedTemporaryFile +from pathlib import Path +from tempfile import NamedTemporaryFile, mkdtemp import dask.array as da import numpy as np @@ -204,6 +205,55 @@ def check_congruence( return wrapped_hires + 2.0 * np.pi * diff_cycles_hires +def unwrap_in_tmpdir( + igram: NDArray[np.complexfloating], + coherence: NDArray[np.floating], + *, + unwrap_func: UnwrapCallback, + nlooks: float, + dir: Path, +) -> tuple[NDArray[np.floating], NDArray[np.unsignedinteger]]: + """ + Perform phase unwrapping in a uniquely-named subdirectory. + + This function acts as a wrapper around `unwrap_func`. It creates a new unique + subdirectory in `dir` to be used as a scratch directory for phase unwrapping. As + long as `unwrap_func` only writes to the scratch directory, data races will not + occur when `unwrap_in_tmpdir` is executed in parallel. + + The scratch directory does not get automatically cleaned up after processing. + + Parameters + ---------- + igram : numpy.ndarray + Input interferogram. + coherence : numpy.ndarray + Sample coherence coefficient, normalized to the interval [0, 1], with the + same shape as the input interferogram. + unwrap_func : UnwrapCallback + A callback function used to unwrap the interferogram. + nlooks : float + Effective number of spatial looks used to form the input coherence data. + dir : pathlib.Path + Parent directory to create the scratch directory inside. Must be an existing + file system directory. + + Returns + ------- + unwphase : numpy.ndarray + Unwrapped phase, in radians. + conncomp : numpy.ndarray + Connected component labels, with the same shape as the unwrapped phase. + """ + scratchdir = Path(mkdtemp(dir=os.fspath(dir))) + return unwrap_func( + igram=igram, + coherence=coherence, + nlooks=nlooks, + scratchdir=scratchdir, + ) + + def coarse_unwrap( igram: da.Array, coherence: da.Array, @@ -211,6 +261,7 @@ def coarse_unwrap( unwrap_func: UnwrapCallback, downsample_factor: tuple[int, int], *, + scratchdir: Path, do_lowpass_filter: bool = True, shape_factor: float = 1.5, overhang: float = 0.5, @@ -242,6 +293,9 @@ def coarse_unwrap( downsample_factor : tuple of int The number of looks to take along each axis in order to form the low-resolution interferogram. + scratchdir : pathlib.Path + Scratch directory where intermediate processing artifacts may be written. Must + be an existing file system directory. do_lowpass_filter : bool, optional If True, apply a low-pass pre-filter prior to multilooking in order to reduce aliasing effects. Defaults to True. @@ -329,10 +383,12 @@ def to_single_chunk(arr: ArrayLike) -> da.Array: # Unwrap the downsampled data. unwrapped_lores, conncomp_lores = _util.map_blocks( - unwrap_func, + unwrap_in_tmpdir, igram_lores_singleblock, coherence_lores_singleblock, + unwrap_func=unwrap_func, nlooks=nlooks_lores, + dir=scratchdir, meta=(np.empty((), dtype=np.float32), np.empty((), dtype=np.uint32)), ) @@ -582,45 +638,48 @@ def multiscale_unwrap( igram_ = da.from_array(igram, chunks=chunksize, asarray=True) coherence_ = da.from_array(coherence, chunks=chunksize, asarray=True) - # Get a coarse estimate of the unwrapped phase using a low-resolution copy of the - # interferogram. - coarse_unwrapped, coarse_conncomp = coarse_unwrap( - igram=igram_, - coherence=coherence_, - nlooks=nlooks, - unwrap_func=unwrap_func, - downsample_factor=downsample_factor, - do_lowpass_filter=do_lowpass_filter, - shape_factor=shape_factor, - overhang=overhang, - ripple=ripple, - attenuation=attenuation, - ) - - # Unwrap each (high-resolution) tile independently. - tiled_unwrapped, tiled_conncomp = _util.map_blocks( - unwrap_func, - igram_, - coherence_, - nlooks=nlooks, - meta=(np.empty((), dtype=np.float32), np.empty((), dtype=np.uint32)), - ) - - # Add or subtract multiples of 2pi to each connected component to minimize the mean - # discrepancy between the high-res and low-res unwrapped phase (in order to correct - # for phase discontinuities between adjacent tiles). - multiscale_unwrapped = da.map_blocks( - adjust_conncomp_offset_cycles, - tiled_unwrapped, - tiled_conncomp, - coarse_unwrapped, - coarse_conncomp, - ) - # Create the scratch directory if it didn't exist. If no scratch directory was # supplied, create a temporary directory that will be cleaned up automatically when # the context block is exited. with _util.scratch_directory(scratchdir) as d: + # Get a coarse estimate of the unwrapped phase using a low-resolution copy of + # the interferogram. + coarse_unwrapped, coarse_conncomp = coarse_unwrap( + igram=igram_, + coherence=coherence_, + nlooks=nlooks, + unwrap_func=unwrap_func, + downsample_factor=downsample_factor, + scratchdir=d, + do_lowpass_filter=do_lowpass_filter, + shape_factor=shape_factor, + overhang=overhang, + ripple=ripple, + attenuation=attenuation, + ) + + # Unwrap each (high-resolution) tile independently. + tiled_unwrapped, tiled_conncomp = _util.map_blocks( + unwrap_in_tmpdir, + igram_, + coherence_, + unwrap_func=unwrap_func, + nlooks=nlooks, + dir=d, + meta=(np.empty((), dtype=np.float32), np.empty((), dtype=np.uint32)), + ) + + # Add or subtract multiples of 2pi to each connected component to minimize the + # mean discrepancy between the high-res and low-res unwrapped phase (in order to + # correct for phase discontinuities between adjacent tiles). + multiscale_unwrapped = da.map_blocks( + adjust_conncomp_offset_cycles, + tiled_unwrapped, + tiled_conncomp, + coarse_unwrapped, + coarse_conncomp, + ) + # Create temporary files to store the intermediate connected component labels # from coarse & tiled unwrapping. coarse_conncomp_tmpfile = unique_binary_file( diff --git a/src/tophu/_unwrap.py b/src/tophu/_unwrap.py index d4d065d..209c15c 100644 --- a/src/tophu/_unwrap.py +++ b/src/tophu/_unwrap.py @@ -4,7 +4,6 @@ import warnings from os import PathLike from pathlib import Path -from tempfile import TemporaryDirectory from typing import Literal, Protocol, runtime_checkable import isce3 @@ -36,6 +35,7 @@ def __call__( igram: NDArray[np.complexfloating], coherence: NDArray[np.floating], nlooks: float, + scratchdir: Path, ) -> tuple[NDArray[np.floating], NDArray[np.unsignedinteger]]: """ Perform two-dimensional phase unwrapping. @@ -49,6 +49,8 @@ def __call__( same shape as the input interferogram. nlooks : float Effective number of spatial looks used to form the input coherence data. + scratchdir : pathlib.Path + Scratch directory where intermediate processing artifacts may be written. Returns ------- @@ -112,6 +114,7 @@ def __call__( igram: NDArray[np.complexfloating], coherence: NDArray[np.floating], nlooks: float, + scratchdir: Path, ) -> tuple[NDArray[np.floating], NDArray[np.unsignedinteger]]: # Convert input arrays to GDAL rasters with the expected datatypes. igram_data = np.asanyarray(igram, dtype=np.complex64) @@ -137,6 +140,7 @@ def __call__( cost=self.cost, cost_params=self.cost_params, init_method=self.init_method, + scratchdir=scratchdir, ) return unwphase, conncomp @@ -396,6 +400,7 @@ def __call__( igram: NDArray[np.complexfloating], coherence: NDArray[np.floating], nlooks: float, + scratchdir: Path, ) -> tuple[NDArray[np.floating], NDArray[np.unsignedinteger]]: # Configure ICU to unwrap the interferogram as a single tile (no bootstrapping). icu = isce3.unwrap.ICU( @@ -412,38 +417,34 @@ def __call__( min_cc_area=self.min_conncomp_area_frac, ) - # Create a temporary scratch directory to store intermediate rasters. - with TemporaryDirectory() as tmpdir: - d = Path(tmpdir) - - # Convert input arrays into rasters. - igram_raster = to_geotiff(d / "igram.tif", igram) - coherence_raster = to_geotiff(d / "coherence.tif", coherence) - - # Create zero-filled rasters to store output data. - unwphase_raster = create_geotiff( - d / "unwphase.tif", - width=igram.shape[1], - length=igram.shape[0], - dtype=np.float32, - ) - conncomp_raster = create_geotiff( - d / "conncomp.tif", - width=igram.shape[1], - length=igram.shape[0], - dtype=np.uint8, - ) - - # Run ICU. - icu.unwrap(unwphase_raster, conncomp_raster, igram_raster, coherence_raster) - - # Ensure changes to output rasters are flushed to disk and close the files. - del unwphase_raster - del conncomp_raster - - # Read output rasters into in-memory arrays. - unwphase = read_raster(d / "unwphase.tif") - conncomp = read_raster(d / "conncomp.tif") + # Convert input arrays into rasters. + igram_raster = to_geotiff(scratchdir / "igram.tif", igram) + coherence_raster = to_geotiff(scratchdir / "coherence.tif", coherence) + + # Create zero-filled rasters to store output data. + unwphase_raster = create_geotiff( + scratchdir / "unwphase.tif", + width=igram.shape[1], + length=igram.shape[0], + dtype=np.float32, + ) + conncomp_raster = create_geotiff( + scratchdir / "conncomp.tif", + width=igram.shape[1], + length=igram.shape[0], + dtype=np.uint8, + ) + + # Run ICU. + icu.unwrap(unwphase_raster, conncomp_raster, igram_raster, coherence_raster) + + # Ensure changes to output rasters are flushed to disk and close the files. + del unwphase_raster + del conncomp_raster + + # Read output rasters into in-memory arrays. + unwphase = read_raster(scratchdir / "unwphase.tif") + conncomp = read_raster(scratchdir / "conncomp.tif") return unwphase, conncomp @@ -490,6 +491,7 @@ def __call__( igram: NDArray[np.complexfloating], coherence: NDArray[np.floating], nlooks: float, + scratchdir: Path, ) -> tuple[NDArray[np.floating], NDArray[np.unsignedinteger]]: # Configure ICU to unwrap the interferogram as a single tile (no bootstrapping). phass = isce3.unwrap.Phass( @@ -501,42 +503,38 @@ def __call__( # Get wrapped phase. wphase = np.angle(igram) - # Create a temporary scratch directory to store intermediate rasters. - with TemporaryDirectory() as tmpdir: - d = Path(tmpdir) - - # Convert input arrays into rasters. - wphase_raster = to_geotiff(d / "wphase.tif", wphase) - coherence_raster = to_geotiff(d / "coherence.tif", coherence) - - # Create zero-filled rasters to store output data. - unwphase_raster = create_geotiff( - d / "unwphase.tif", - width=igram.shape[1], - length=igram.shape[0], - dtype=np.float32, - ) - conncomp_raster = create_geotiff( - d / "conncomp.tif", - width=igram.shape[1], - length=igram.shape[0], - dtype=np.uint32, - ) - - # Run PHASS. - phass.unwrap( - wphase_raster, - coherence_raster, - unwphase_raster, - conncomp_raster, - ) - - # Ensure changes to output rasters are flushed to disk and close the files. - del unwphase_raster - del conncomp_raster - - # Read output rasters into in-memory arrays. - unwphase = read_raster(d / "unwphase.tif") - conncomp = read_raster(d / "conncomp.tif") + # Convert input arrays into rasters. + wphase_raster = to_geotiff(scratchdir / "wphase.tif", wphase) + coherence_raster = to_geotiff(scratchdir / "coherence.tif", coherence) + + # Create zero-filled rasters to store output data. + unwphase_raster = create_geotiff( + scratchdir / "unwphase.tif", + width=igram.shape[1], + length=igram.shape[0], + dtype=np.float32, + ) + conncomp_raster = create_geotiff( + scratchdir / "conncomp.tif", + width=igram.shape[1], + length=igram.shape[0], + dtype=np.uint32, + ) + + # Run PHASS. + phass.unwrap( + wphase_raster, + coherence_raster, + unwphase_raster, + conncomp_raster, + ) + + # Ensure changes to output rasters are flushed to disk and close the files. + del unwphase_raster + del conncomp_raster + + # Read output rasters into in-memory arrays. + unwphase = read_raster(scratchdir / "unwphase.tif") + conncomp = read_raster(scratchdir / "conncomp.tif") return unwphase, conncomp diff --git a/test/test_multiscale.py b/test/test_multiscale.py index 493f4d6..a73d179 100644 --- a/test/test_multiscale.py +++ b/test/test_multiscale.py @@ -1,5 +1,9 @@ from __future__ import annotations +import os +from pathlib import Path +from tempfile import TemporaryDirectory + import numpy as np import pytest from numpy.typing import ArrayLike, NDArray @@ -68,6 +72,28 @@ def jaccard_similarity(a: ArrayLike, b: ArrayLike) -> float: return np.sum(a & b) / np.sum(a | b) +def list_subdirs(dir: str | os.PathLike) -> list[Path]: + """ + List all immediate subdirectories of the specified directory. + + Parameters + ---------- + dir : path-like + The parent directory. Must be an existing file system directory. + + Returns + ------- + subdirs : list of pathlib.Path + All subdirectories of `dir`. + """ + dir = Path(dir) + + if not dir.is_dir(): + raise ValueError(f"'{dir}' is not an existing directory.") + + return [d for d in dir.iterdir() if d.is_dir()] + + class TestMultiScaleUnwrap: @pytest.mark.parametrize("length,width", [(1023, 1023), (1024, 1024)]) @pytest.mark.parametrize("unwrap_func", UNWRAP_FUNCS) @@ -477,3 +503,29 @@ def test_bad_overhang(self, overhang: float): ntiles=(2, 2), overhang=overhang, ) + + def test_scratchdir(self): + unwrapped, conncomp, igram, coherence = dummy_inputs_and_outputs() + + with TemporaryDirectory() as d: + scratchdir = Path(d) + + tophu.multiscale_unwrap( + unwrapped, + conncomp, + igram, + coherence, + nlooks=1.0, + unwrap_func=tophu.SnaphuUnwrap(), + downsample_factor=(3, 3), + ntiles=(2, 2), + scratchdir=scratchdir, + ) + + # Check that the scratch directory still exists. + assert scratchdir.is_dir() + + # The scratch directory should contain 5 unique subdirectories -- one for + # each tile and one for the coarse unwrapping step. + subdirs = list_subdirs(scratchdir) + assert len(subdirs) == 5 diff --git a/test/test_unwrap.py b/test/test_unwrap.py index 5ee8f69..f251628 100644 --- a/test/test_unwrap.py +++ b/test/test_unwrap.py @@ -1,3 +1,6 @@ +from pathlib import Path +from tempfile import TemporaryDirectory + import numpy as np import pytest from isce3.unwrap import snaphu @@ -100,7 +103,9 @@ def test_unwrap(self): igram = np.exp(1j * phase) # Unwrap. - unw, conncomp = unwrap(igram, coherence, nlooks) + with TemporaryDirectory() as d: + scratchdir = Path(d) + unw, conncomp = unwrap(igram, coherence, nlooks, scratchdir) # Get a mask of valid pixels (pixels that were assigned to some connected # component). @@ -123,6 +128,31 @@ def test_unwrap(self): assert unique_labels == {1} assert frac_nonzero(conncomp) > 0.95 + def test_scratchdir(self): + # Create dummy interferogram & coherence data. + length, width = 256, 256 + igram = np.zeros((length, width), dtype=np.complex64) + coherence = np.ones((length, width), dtype=np.float32) + + # Init unwrap callback functor. + unwrap = tophu.SnaphuUnwrap() + + with TemporaryDirectory() as d: + scratchdir = Path(d) + + # Unwrap. + unwrap(igram, coherence, 1.0, scratchdir) + + # Check directory contents. + for filename in [ + "snaphu.conf", + "igram.c8", + "corr.f4", + "unw.f4", + "conncomp.u4", + ]: + assert (scratchdir / filename).is_file() + class TestICUUnwrap: def test_interface(self): @@ -207,7 +237,9 @@ def test_unwrap(self): unwrap = tophu.ICUUnwrap(min_coherence=0.5) # Unwrap. - unw, conncomp = unwrap(igram, coherence, nlooks) + with TemporaryDirectory() as d: + scratchdir = Path(d) + unw, conncomp = unwrap(igram, coherence, nlooks, scratchdir) # Check the number of unique nonzero connected component labels. unique_labels = set(np.unique(conncomp)) @@ -239,6 +271,30 @@ def test_unwrap(self): good_pixels = np.isclose(unw[mask] + off, phase[mask], rtol=1e-5, atol=1e-5) assert frac_nonzero(good_pixels) > 0.95 + def test_scratchdir(self): + # Create dummy interferogram & coherence data. + length, width = 256, 256 + igram = np.zeros((length, width), dtype=np.complex64) + coherence = np.ones((length, width), dtype=np.float32) + + # Init unwrap callback functor. + unwrap = tophu.ICUUnwrap() + + with TemporaryDirectory() as d: + scratchdir = Path(d) + + # Unwrap. + unwrap(igram, coherence, 1.0, scratchdir) + + # Check directory contents. + for filename in [ + "igram.tif", + "coherence.tif", + "unwphase.tif", + "conncomp.tif", + ]: + assert (scratchdir / filename).is_file() + class TestPhassUnwrap: def test_interface(self): @@ -292,7 +348,9 @@ def test_unwrap(self): unwrap = tophu.PhassUnwrap(coherence_thresh=0.5) # Unwrap. - unw, conncomp = unwrap(igram, coherence, nlooks) + with TemporaryDirectory() as d: + scratchdir = Path(d) + unw, conncomp = unwrap(igram, coherence, nlooks, scratchdir) # Check the number of unique nonzero connected component labels. unique_labels = set(np.unique(conncomp)) @@ -323,3 +381,27 @@ def test_unwrap(self): off = round_to_nearest(np.mean(phasediff[mask]), 2.0 * np.pi) good_pixels = np.isclose(unw[mask] + off, phase[mask], rtol=1e-5, atol=1e-5) assert frac_nonzero(good_pixels) > 0.95 + + def test_scratchdir(self): + # Create dummy interferogram & coherence data. + length, width = 256, 256 + igram = np.zeros((length, width), dtype=np.complex64) + coherence = np.ones((length, width), dtype=np.float32) + + # Init unwrap callback functor. + unwrap = tophu.PhassUnwrap() + + with TemporaryDirectory() as d: + scratchdir = Path(d) + + # Unwrap. + unwrap(igram, coherence, 1.0, scratchdir) + + # Check directory contents. + for filename in [ + "wphase.tif", + "coherence.tif", + "unwphase.tif", + "conncomp.tif", + ]: + assert (scratchdir / filename).is_file()