Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC] first try of multiprocessing for reprojection 2/2 #648

Open
3 tasks
adebardo opened this issue Feb 6, 2025 · 1 comment
Open
3 tasks

[POC] first try of multiprocessing for reprojection 2/2 #648

adebardo opened this issue Feb 6, 2025 · 1 comment

Comments

@adebardo
Copy link

adebardo commented Feb 6, 2025

Ticket No. 2

Context

The goal of this ticket is to implement tiled reprojection to facilitate the processing of "heavy" datasets, such as 40,000x40,000 DSM CARS.

Proposal

We have observed that this need was anticipated with the creation of the file [delayed.py](https://github.com/GlacioHack/geoutils/blob/main/geoutils/raster/delayed.py). Unfortunately, this file relies on the use of the DASK module, which enables distributed computing. However, based on past experiences, the CS 3D development team does not feel comfortable maintaining such a component for ICC needs, for the following reasons:

  • Memory management issues
  • Significant time lost on debugging and maintenance
  • Dask is effective when initial objects are designed with its philosophy in mind → would require several months of development

For these reasons, we propose implementing a delayed_multiproc module as an alternative to the current delayed implementation.

Implementation (suite)

  • Convert each of the existing functions into a multiprocessing version in delayed_multiproc.py (7 functions in total).
    Example:
## DASK
@dask.delayed  # type: ignore
def _delayed_nb_valids(arr_chunk: NDArrayNum | NDArrayBool) -> NDArrayNum:
    """Count number of valid values per block."""
    if arr_chunk.dtype == "bool":
        return np.array([np.count_nonzero(arr_chunk)]).reshape((1, 1))
    return np.array([np.count_nonzero(np.isfinite(arr_chunk))]).reshape((1, 1))

## MULTIPROCESSING
def _nb_valids(arr_chunk: Union[NDArrayNum, NDArrayBool]) -> NDArrayNum: 
    """Count number of valid values per block.""" 
    if arr_chunk.dtype == "bool": 
        return np.array([np.count_nonzero(arr_chunk)]).reshape((1, 1)) 
    return np.array([np.count_nonzero(np.isfinite(arr_chunk))]).reshape((1, 1)) 

def parallel_nb_valids(chunks: list[Union[NDArrayNum, NDArrayBool]], num_workers: int = 4) -> list[NDArrayNum]: 
    """Parallel execution of _nb_valids using multiprocessing.""" 
    with multiprocessing.Pool(processes=num_workers) as pool: 
        results = pool.map(_nb_valids, chunks) 
    return results
  • Since the current code uses Dask arrays, ensure that multiprocessing does not handle Dask arrays to avoid confusion. Convert them to NumPy arrays where necessary.
  • Manage dask.compute() calls: these calls execute dask.delayed() functions, so they need to be replaced with equivalent multiprocessing implementations (4 instances in total).

Tests

Adapt the dask tests to multiprocessing

Documentation

Updating the documentation

@rhugonnet
Copy link
Member

(Commenting here for both this issue and #647)
Great! This follows what we discussed in December with @steuxyo 😁

I have two main remarks at this point (we can discuss more during the call next week!):

  1. If I'm not mistaken, there is an additional aspect where the two implementations above differ: The Dask implementation can manage chunk-reading implicitly from the entire da.Array with defined chunks (so does not need much more than what's written above), while the Multiprocessing one will need to be paired with a custom Rasterio.windows reading (and custom tile definition but this is covered in [POC] Tiling for multiprocessing #649). This "custom reading step" should then be added within the multiprocessing.Pool call above?

  2. Also comes the question of the later implementation of these functions within the Raster class. It is doable, but not trivial, because of two reasons: (i) Some raster operations have a "delayed" effect such as crop() (and the recent Raster(roi=)), or Raster(downsample=). We might have to reconcile behaviour for those (for a start we could simply "flag" that a delayed operation is pending and raise an error when trying to combine with delayed multiprocessing, to avoid conflicts). (ii) I might be wrong here, but I think we won't be able to chain several steps, or call a final compute("myfilepath.tif") as easily as with Dask: We'll use this as a 1-step multiprocessing. If that is indeed the case, we'll need to be able to provide a new argument pointing towards an output file for each Raster function producing a large output, like Raster.reproject(filename_delayed_mp=...), to be able to write the output out-of-memory. (this won't concern subsample() or interp())

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants